From 1b4201aa7608261760b649543bc71593e86ed601 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 30 Aug 2020 15:51:28 +0200 Subject: [PATCH] improve performance by using path instead of pathName for packet routing --- client.go | 45 ++++++++++++--------------- main.go | 92 +++++++++++++++++++------------------------------------ path.go | 24 +++++++-------- source.go | 28 ++++++++--------- 4 files changed, 78 insertions(+), 111 deletions(-) diff --git a/client.go b/client.go index 5fe3d7e0..99633aaf 100644 --- a/client.go +++ b/client.go @@ -31,8 +31,7 @@ type udpClient struct { } type udpClientAddr struct { - // use a fixed-size array for ip comparison - ip [net.IPv6len]byte + ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator port int } @@ -110,7 +109,7 @@ type client struct { p *program conn *gortsplib.ConnServer state clientState - pathName string + path *path authUser string authPass string authHelper *gortsplib.AuthServer @@ -481,8 +480,8 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return true } - if c.pathName != "" && basePath != c.pathName { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathName, basePath)) + if c.path != nil && basePath != c.path.name { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) return false } @@ -614,9 +613,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - // after ANNOUNCE, c.pathName is already set - if basePath != c.pathName { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathName, basePath)) + // after ANNOUNCE, c.path is already set + if basePath != c.path.name { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) return false } @@ -648,7 +647,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) >= len(c.p.paths[c.pathName].publisherSdpParsed.MediaDescriptions) { + if len(c.streamTracks) >= len(c.path.publisherSdpParsed.MediaDescriptions) { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return false } @@ -706,7 +705,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) >= len(c.p.paths[c.pathName].publisherSdpParsed.MediaDescriptions) { + if len(c.streamTracks) >= len(c.path.publisherSdpParsed.MediaDescriptions) { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return false } @@ -759,17 +758,13 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { // path can end with a slash, remove it path = strings.TrimSuffix(path, "/") - if path != c.pathName { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathName, path)) + if path != c.path.name { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, path)) return false } - // check publisher existence - res := make(chan error) - c.p.events <- programEventClientPlay1{res, c} - err := <-res - if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + if len(c.streamTracks) == 0 { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks have been setup")) return false } @@ -797,12 +792,12 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { // path can end with a slash, remove it path = strings.TrimSuffix(path, "/") - if path != c.pathName { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathName, path)) + if path != c.path.name { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, path)) return false } - if len(c.streamTracks) != len(c.p.paths[c.pathName].publisherSdpParsed.MediaDescriptions) { + if len(c.streamTracks) != len(c.path.publisherSdpParsed.MediaDescriptions) { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) return false } @@ -837,10 +832,10 @@ func (c *client) runPlay(path string) { // start sending frames only after sending the response to the PLAY request done := make(chan struct{}) - c.p.events <- programEventClientPlay2{done, c} + c.p.events <- programEventClientPlay{done, c} <-done - c.log("is receiving on path '%s', %d %s via %s", c.pathName, len(c.streamTracks), func() string { + c.log("is receiving on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } @@ -958,7 +953,7 @@ func (c *client) runRecord(path string) { c.p.events <- programEventClientRecord{done, c} <-done - c.log("is publishing on path '%s', %d %s via %s", c.pathName, len(c.streamTracks), func() string { + c.log("is publishing on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } @@ -1063,7 +1058,7 @@ func (c *client) runRecord(path string) { c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.p.events <- programEventClientFrameTcp{ - c.pathName, + c.path, frame.TrackId, frame.StreamType, frame.Content, diff --git a/main.go b/main.go index eb7a246d..1ce59d33 100644 --- a/main.go +++ b/main.go @@ -50,8 +50,8 @@ type programEventClientClose struct { func (programEventClientClose) isProgramEvent() {} type programEventClientDescribe struct { - client *client - path string + client *client + pathName string } func (programEventClientDescribe) isProgramEvent() {} @@ -59,7 +59,7 @@ func (programEventClientDescribe) isProgramEvent() {} type programEventClientAnnounce struct { res chan error client *client - path string + pathName string sdpText []byte sdpParsed *sdp.SessionDescription } @@ -67,10 +67,10 @@ type programEventClientAnnounce struct { func (programEventClientAnnounce) isProgramEvent() {} type programEventClientSetupPlay struct { - res chan error - client *client - path string - trackId int + res chan error + client *client + pathName string + trackId int } func (programEventClientSetupPlay) isProgramEvent() {} @@ -82,19 +82,12 @@ type programEventClientSetupRecord struct { func (programEventClientSetupRecord) isProgramEvent() {} -type programEventClientPlay1 struct { - res chan error - client *client -} - -func (programEventClientPlay1) isProgramEvent() {} - -type programEventClientPlay2 struct { +type programEventClientPlay struct { done chan struct{} client *client } -func (programEventClientPlay2) isProgramEvent() {} +func (programEventClientPlay) isProgramEvent() {} type programEventClientPlayStop struct { done chan struct{} @@ -126,7 +119,7 @@ type programEventClientFrameUdp struct { func (programEventClientFrameUdp) isProgramEvent() {} type programEventClientFrameTcp struct { - path string + path *path trackId int streamType gortsplib.StreamType buf []byte @@ -321,12 +314,8 @@ outer: case programEventClientClose: delete(p.clients, evt.client) - if evt.client.pathName != "" { - if path, ok := p.paths[evt.client.pathName]; ok { - if path.publisher == evt.client { - path.onPublisherRemove() - } - } + if evt.client.path != nil && evt.client.path.publisher == evt.client { + evt.client.path.onPublisherRemove() } evt.client.log("disconnected") @@ -334,31 +323,31 @@ outer: case programEventClientDescribe: // create path if not exist - if _, ok := p.paths[evt.path]; !ok { - p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false) + if _, ok := p.paths[evt.pathName]; !ok { + p.paths[evt.pathName] = newPath(p, evt.pathName, p.findConfForPathName(evt.pathName), false) } - p.paths[evt.path].onDescribe(evt.client) + p.paths[evt.pathName].onDescribe(evt.client) case programEventClientAnnounce: // create path if not exist - if path, ok := p.paths[evt.path]; !ok { - p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false) + if path, ok := p.paths[evt.pathName]; !ok { + p.paths[evt.pathName] = newPath(p, evt.pathName, p.findConfForPathName(evt.pathName), false) } else { if path.publisher != nil { - evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) + evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.pathName) continue } } - p.paths[evt.path].onPublisherNew(evt.client, evt.sdpText, evt.sdpParsed) + p.paths[evt.pathName].onPublisherNew(evt.client, evt.sdpText, evt.sdpParsed) evt.res <- nil case programEventClientSetupPlay: - path, ok := p.paths[evt.path] + path, ok := p.paths[evt.pathName] if !ok || !path.publisherReady { - evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.path) + evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.pathName) continue } @@ -367,7 +356,7 @@ outer: continue } - evt.client.pathName = evt.path + evt.client.path = path evt.client.state = clientStatePrePlay evt.res <- nil @@ -375,21 +364,7 @@ outer: evt.client.state = clientStatePreRecord evt.res <- nil - case programEventClientPlay1: - path, ok := p.paths[evt.client.pathName] - if !ok || !path.publisherReady { - evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.pathName) - continue - } - - if len(evt.client.streamTracks) == 0 { - evt.res <- fmt.Errorf("no tracks have been setup") - continue - } - - evt.res <- nil - - case programEventClientPlay2: + case programEventClientPlay: p.readerCount += 1 evt.client.state = clientStatePlay close(evt.done) @@ -421,7 +396,7 @@ outer: } } - p.paths[evt.client.pathName].onPublisherSetReady() + evt.client.path.onPublisherSetReady() close(evt.done) case programEventClientRecordStop: @@ -436,7 +411,7 @@ outer: delete(p.udpClientsByAddr, key) } } - p.paths[evt.client.pathName].onPublisherSetNotReady() + evt.client.path.onPublisherSetNotReady() close(evt.done) case programEventClientFrameUdp: @@ -451,21 +426,21 @@ outer: } pub.client.rtcpReceivers[pub.trackId].OnFrame(evt.streamType, evt.buf) - p.forwardFrame(pub.client.pathName, pub.trackId, evt.streamType, evt.buf) + p.forwardFrame(pub.client.path, pub.trackId, evt.streamType, evt.buf) case programEventClientFrameTcp: p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) case programEventSourceReady: evt.source.log("ready") - p.paths[evt.source.pathName].onPublisherSetReady() + evt.source.path.onPublisherSetReady() case programEventSourceNotReady: evt.source.log("not ready") - p.paths[evt.source.pathName].onPublisherSetNotReady() + evt.source.path.onPublisherSetNotReady() case programEventSourceFrame: - p.forwardFrame(evt.source.pathName, evt.trackId, evt.streamType, evt.buf) + p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf) case programEventTerminate: break outer @@ -494,10 +469,7 @@ outer: case programEventClientSetupRecord: evt.res <- fmt.Errorf("terminated") - case programEventClientPlay1: - evt.res <- fmt.Errorf("terminated") - - case programEventClientPlay2: + case programEventClientPlay: close(evt.done) case programEventClientPlayStop: @@ -564,9 +536,9 @@ func (p *program) findConfForPathName(name string) *confPath { return nil } -func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { +func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) { for c := range p.clients { - if c.pathName != path || + if c.path != path || c.state != clientStatePlay { continue } diff --git a/path.go b/path.go index 69f8693a..4d62d866 100644 --- a/path.go +++ b/path.go @@ -45,7 +45,7 @@ func newPath(p *program, name string, confp *confPath, permanent bool) *path { } if confp.Source != "record" { - s := newSource(p, name, confp) + s := newSource(p, pa, confp) pa.source = s pa.publisher = s } @@ -95,7 +95,7 @@ func (pa *path) onClose() { func (pa *path) hasClients() bool { for c := range pa.p.clients { - if c.pathName == pa.name { + if c.path == pa { return true } } @@ -104,7 +104,7 @@ func (pa *path) hasClients() bool { func (pa *path) hasClientsWaitingDescribe() bool { for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && c.pathName == pa.name { + if c.state == clientStateWaitingDescription && c.path == pa { return true } } @@ -113,7 +113,7 @@ func (pa *path) hasClientsWaitingDescribe() bool { func (pa *path) hasClientReaders() bool { for c := range pa.p.clients { - if c.pathName == pa.name && c != pa.publisher { + if c.path == pa && c != pa.publisher { return true } } @@ -126,8 +126,8 @@ func (pa *path) onCheck() { time.Since(pa.lastDescribeActivation) >= describeTimeout { for c := range pa.p.clients { if c.state == clientStateWaitingDescription && - c.pathName == pa.name { - c.pathName = "" + c.path == pa { + c.path = nil c.state = clientStateInitial c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} } @@ -169,7 +169,7 @@ func (pa *path) onPublisherNew(client *client, sdpText []byte, sdpParsed *sdp.Se pa.publisherSdpText = sdpText pa.publisherSdpParsed = sdpParsed - client.pathName = pa.name + client.path = pa client.state = clientStateAnnounce } @@ -183,8 +183,8 @@ func (pa *path) onPublisherSetReady() { // reply to all clients that are waiting for a description for c := range pa.p.clients { if c.state == clientStateWaitingDescription && - c.pathName == pa.name { - c.pathName = "" + c.path == pa { + c.path = nil c.state = clientStateInitial c.describeRes <- describeRes{pa.publisherSdpText, nil} } @@ -198,7 +198,7 @@ func (pa *path) onPublisherSetNotReady() { for c := range pa.p.clients { if c.state != clientStateWaitingDescription && c != pa.publisher && - c.pathName == pa.name { + c.path == pa { c.conn.NetConn().Close() } } @@ -227,7 +227,7 @@ func (pa *path) onDescribe(client *client) { } } - client.pathName = pa.name + client.path = pa client.state = clientStateWaitingDescription // no on-demand: reply with 404 @@ -244,7 +244,7 @@ func (pa *path) onDescribe(client *client) { pa.source.events <- sourceEventApplyState{pa.source.state} } - client.pathName = pa.name + client.path = pa client.state = clientStateWaitingDescription // publisher was found and is ready diff --git a/source.go b/source.go index 9bf1b89f..8b66231f 100644 --- a/source.go +++ b/source.go @@ -38,23 +38,23 @@ type sourceEventTerminate struct{} func (sourceEventTerminate) isSourceEvent() {} type source struct { - p *program - pathName string - confp *confPath - state sourceState - tracks []*gortsplib.Track + p *program + path *path + confp *confPath + state sourceState + tracks []*gortsplib.Track events chan sourceEvent done chan struct{} } -func newSource(p *program, pathName string, confp *confPath) *source { +func newSource(p *program, path *path, confp *confPath) *source { s := &source{ - p: p, - pathName: pathName, - confp: confp, - events: make(chan sourceEvent), - done: make(chan struct{}), + p: p, + path: path, + confp: confp, + events: make(chan sourceEvent), + done: make(chan struct{}), } if confp.SourceOnDemand { @@ -67,7 +67,7 @@ func newSource(p *program, pathName string, confp *confPath) *source { } func (s *source) log(format string, args ...interface{}) { - s.p.log("[source "+s.pathName+"] "+format, args...) + s.p.log("[source "+s.path.name+"] "+format, args...) } func (s *source) isPublisher() {} @@ -188,8 +188,8 @@ func (s *source) doInner(terminate chan struct{}) bool { serverSdpParsed, serverSdpText := sdpForServer(tracks) s.tracks = tracks - s.p.paths[s.pathName].publisherSdpText = serverSdpText - s.p.paths[s.pathName].publisherSdpParsed = serverSdpParsed + s.path.publisherSdpText = serverSdpText + s.path.publisherSdpParsed = serverSdpParsed if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp { return s.runUdp(terminate, conn)