diff --git a/client.go b/client.go index 3e50a9a1..1e98b1d7 100644 --- a/client.go +++ b/client.go @@ -135,6 +135,38 @@ func newClient(p *program, nconn net.Conn) *client { return c } +func (c *client) close() { + delete(c.p.clients, c) + + switch c.state { + case clientStatePlay: + c.p.readerCount -= 1 + + case clientStateRecord: + c.p.publisherCount -= 1 + + if c.streamProtocol == gortsplib.StreamProtocolUdp { + for _, track := range c.streamTracks { + key := makeUdpClientAddr(c.ip(), track.rtpPort) + delete(c.p.udpClientsByAddr, key) + + key = makeUdpClientAddr(c.ip(), track.rtcpPort) + delete(c.p.udpClientsByAddr, key) + } + } + + c.path.onPublisherSetNotReady() + } + + if c.path != nil && c.path.publisher == c { + c.path.onPublisherRemove() + } + + close(c.terminate) + + c.log("disconnected") +} + func (c *client) log(format string, args ...interface{}) { c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) } diff --git a/main.go b/main.go index 09a051b0..e8218333 100644 --- a/main.go +++ b/main.go @@ -291,7 +291,7 @@ outer: if _, ok := p.clients[evt.client]; !ok { continue } - p.closeClient(evt.client) + evt.client.close() case programEventClientDescribe: // create path if not exist @@ -429,7 +429,7 @@ outer: } for c := range p.clients { - p.closeClient(c) + c.close() <-c.done } @@ -466,38 +466,6 @@ func (p *program) findConfForPathName(name string) *confPath { return nil } -func (p *program) closeClient(client *client) { - delete(p.clients, client) - - switch client.state { - case clientStatePlay: - p.readerCount -= 1 - - case clientStateRecord: - p.publisherCount -= 1 - - if client.streamProtocol == gortsplib.StreamProtocolUdp { - for _, track := range client.streamTracks { - key := makeUdpClientAddr(client.ip(), track.rtpPort) - delete(p.udpClientsByAddr, key) - - key = makeUdpClientAddr(client.ip(), track.rtcpPort) - delete(p.udpClientsByAddr, key) - } - } - - client.path.onPublisherSetNotReady() - } - - if client.path != nil && client.path.publisher == client { - client.path.onPublisherRemove() - } - - close(client.terminate) - - client.log("disconnected") -} - func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) { for c := range p.clients { if c.path != path || diff --git a/path.go b/path.go index 9b876041..a15889c1 100644 --- a/path.go +++ b/path.go @@ -99,7 +99,7 @@ func (pa *path) onClose() { c.state = clientStateInitial c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} } else { - pa.p.closeClient(c) + c.close() } } } @@ -184,7 +184,7 @@ func (pa *path) onPublisherRemove() { if c.path == pa && c.state != clientStateWaitDescription && c != pa.publisher { - pa.p.closeClient(c) + c.close() } } } @@ -211,7 +211,7 @@ func (pa *path) onPublisherSetNotReady() { if c.path == pa && c.state != clientStateWaitDescription && c != pa.publisher { - pa.p.closeClient(c) + c.close() } } } diff --git a/source.go b/source.go index 53d2cbb7..ec8c5743 100644 --- a/source.go +++ b/source.go @@ -40,12 +40,12 @@ type source struct { func newSource(p *program, path *path, confp *confPath) *source { s := &source{ - p: p, - path: path, - confp: confp, - setState: make(chan sourceState), - terminate: make(chan struct{}), - done: make(chan struct{}), + p: p, + path: path, + confp: confp, + setState: make(chan sourceState), + terminate: make(chan struct{}), + done: make(chan struct{}), } if confp.SourceOnDemand {