Browse Source

speed up routing of udp frames

pull/205/head v0.9.8
aler9 6 years ago
parent
commit
2b2f8fcdf3
  1. 63
      main.go
  2. 14
      utils.go

63
main.go

@ -163,18 +163,19 @@ type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {} func (programEventTerminate) isProgramEvent() {}
type program struct { type program struct {
conf *conf conf *conf
logFile *os.File logFile *os.File
metrics *metrics metrics *metrics
serverRtsp *serverTcp serverRtsp *serverTcp
serverRtp *serverUdp serverRtp *serverUdp
serverRtcp *serverUdp serverRtcp *serverUdp
sources []*source sources []*source
clients map[*client]struct{} clients map[*client]struct{}
paths map[string]*path udpClientPublishers map[ipKey]*client
cmds []*exec.Cmd paths map[string]*path
publisherCount int cmds []*exec.Cmd
readerCount int publisherCount int
readerCount int
events chan programEvent events chan programEvent
done chan struct{} done chan struct{}
@ -200,11 +201,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
} }
p := &program{ p := &program{
conf: conf, conf: conf,
clients: make(map[*client]struct{}), clients: make(map[*client]struct{}),
paths: make(map[string]*path), udpClientPublishers: make(map[ipKey]*client),
events: make(chan programEvent), paths: make(map[string]*path),
done: make(chan struct{}), events: make(chan programEvent),
done: make(chan struct{}),
} }
if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok { if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok {
@ -419,17 +421,19 @@ outer:
case programEventClientRecord: case programEventClientRecord:
p.publisherCount += 1 p.publisherCount += 1
evt.client.state = clientStateRecord evt.client.state = clientStateRecord
p.udpClientPublishers[makeIpKey(evt.client.ip())] = evt.client
p.paths[evt.client.pathId].publisherSetReady() p.paths[evt.client.pathId].publisherSetReady()
close(evt.done) close(evt.done)
case programEventClientRecordStop: case programEventClientRecordStop:
p.publisherCount -= 1 p.publisherCount -= 1
evt.client.state = clientStatePreRecord evt.client.state = clientStatePreRecord
delete(p.udpClientPublishers, makeIpKey(evt.client.ip()))
p.paths[evt.client.pathId].publisherSetNotReady() p.paths[evt.client.pathId].publisherSetNotReady()
close(evt.done) close(evt.done)
case programEventClientFrameUdp: case programEventClientFrameUdp:
client, trackId := p.findClientPublisher(evt.addr, evt.streamType) client, trackId := p.findUdpClientPublisher(evt.addr, evt.streamType)
if client == nil { if client == nil {
continue continue
} }
@ -544,31 +548,22 @@ func (p *program) findConfForPath(path string) *confPath {
return nil return nil
} }
func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) { func (p *program) findUdpClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) {
for _, path := range p.paths { c, ok := p.udpClientPublishers[makeIpKey(addr.IP)]
cl, ok := path.publisher.(*client) if ok {
if !ok { for i, t := range c.streamTracks {
continue
}
if cl.streamProtocol != gortsplib.StreamProtocolUdp ||
cl.state != clientStateRecord ||
!cl.ip().Equal(addr.IP) {
continue
}
for i, t := range cl.streamTracks {
if streamType == gortsplib.StreamTypeRtp { if streamType == gortsplib.StreamTypeRtp {
if t.rtpPort == addr.Port { if t.rtpPort == addr.Port {
return cl, i return c, i
} }
} else { } else {
if t.rtcpPort == addr.Port { if t.rtcpPort == addr.Port {
return cl, i return c, i
} }
} }
} }
} }
return nil, -1 return nil, -1
} }

14
utils.go

@ -147,3 +147,17 @@ func splitPath(path string) (string, string, error) {
return comps[0], comps[1], nil return comps[0], comps[1], nil
} }
// use a fixed-size array for ip comparison
type ipKey [net.IPv6len]byte
func makeIpKey(ip net.IP) ipKey {
var ret ipKey
if len(ip) == net.IPv4len {
copy(ret[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix
copy(ret[12:], ip)
} else {
copy(ret[:], ip)
}
return ret
}

Loading…
Cancel
Save