From 7743849e9a46ddea18f7e9481647431209ac3f55 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 1 Sep 2020 00:01:17 +0200 Subject: [PATCH] replace events with channels --- client.go | 59 ++++++-- main.go | 379 ++++++++++++++++++++++---------------------------- metrics.go | 6 +- server-tcp.go | 18 +-- server-udp.go | 16 +-- source.go | 21 ++- 6 files changed, 245 insertions(+), 254 deletions(-) diff --git a/client.go b/client.go index 1e98b1d7..dd4763b9 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,39 @@ const ( clientUdpWriteBufferSize = 128 * 1024 ) +type clientDescribeReq struct { + client *client + pathName string +} + +type clientAnnounceReq struct { + res chan error + client *client + pathName string + sdpText []byte + sdpParsed *sdp.SessionDescription +} + +type clientSetupPlayReq struct { + res chan error + client *client + pathName string + trackId int +} + +type clientFrameUdpReq struct { + addr *net.UDPAddr + streamType gortsplib.StreamType + buf []byte +} + +type clientFrameTcpReq struct { + path *path + trackId int + streamType gortsplib.StreamType + buf []byte +} + type udpClient struct { client *client trackId int @@ -362,7 +395,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { return nil } - c.p.events <- programEventClientDescribe{c, pathName} + c.p.clientDescribe <- clientDescribeReq{c, pathName} c.describeCSeq = cseq c.describeUrl = req.Url.String() @@ -435,7 +468,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { sdpParsed, req.Content = sdpForServer(tracks) res := make(chan error) - c.p.events <- programEventClientAnnounce{res, c, pathName, req.Content, sdpParsed} + c.p.clientAnnounce <- clientAnnounceReq{res, c, pathName, req.Content, sdpParsed} err = <-res if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) @@ -527,7 +560,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, basePath, trackId} + c.p.clientSetupPlay <- clientSetupPlayReq{res, c, basePath, trackId} err = <-res if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) @@ -568,7 +601,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, basePath, trackId} + c.p.clientSetupPlay <- clientSetupPlayReq{res, c, basePath, trackId} err = <-res if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) @@ -826,7 +859,7 @@ func (c *client) runInitial() bool { if err != io.EOF && err != errRunTerminate { c.log("ERR: %s", err) } - c.p.events <- programEventClientClose{c} + c.p.clientClose <- c <-c.terminate return false } @@ -865,7 +898,7 @@ func (c *client) runWaitDescription() bool { func (c *client) runPlay() bool { // start sending frames only after sending the response to the PLAY request - c.p.events <- programEventClientPlay{c} + c.p.clientPlay <- c c.log("is receiving on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { @@ -926,7 +959,7 @@ func (c *client) runPlayUdp() { if err != io.EOF && err != errRunTerminate { c.log("ERR: %s", err) } - c.p.events <- programEventClientClose{c} + c.p.clientClose <- c <-c.terminate return @@ -978,7 +1011,7 @@ func (c *client) runPlayTcp() { for range c.tcpFrame { } }() - c.p.events <- programEventClientClose{c} + c.p.clientClose <- c <-c.terminate return @@ -999,7 +1032,7 @@ func (c *client) runRecord() bool { c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } - c.p.events <- programEventClientRecord{c} + c.p.clientRecord <- c c.log("is publishing on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { @@ -1071,7 +1104,7 @@ func (c *client) runRecordUdp() { if err != io.EOF && err != errRunTerminate { c.log("ERR: %s", err) } - c.p.events <- programEventClientClose{c} + c.p.clientClose <- c <-c.terminate return @@ -1081,7 +1114,7 @@ func (c *client) runRecordUdp() { c.log("ERR: stream is dead") c.conn.Close() <-readDone - c.p.events <- programEventClientClose{c} + c.p.clientClose <- c <-c.terminate return } @@ -1132,7 +1165,7 @@ func (c *client) runRecordTcp() { } c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - c.p.events <- programEventClientFrameTcp{ + c.p.clientFrameTcp <- clientFrameTcpReq{ c.path, frame.TrackId, frame.StreamType, @@ -1159,7 +1192,7 @@ func (c *client) runRecordTcp() { if err != io.EOF && err != errRunTerminate { c.log("ERR: %s", err) } - c.p.events <- programEventClientClose{c} + c.p.clientClose <- c <-c.terminate return diff --git a/main.go b/main.go index e8218333..bc8189e0 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "time" "github.com/aler9/gortsplib" - "github.com/aler9/sdp/v3" "gopkg.in/alecthomas/kingpin.v2" ) @@ -26,108 +25,6 @@ const ( logDestinationFile ) -type programEvent interface { - isProgramEvent() -} - -type programEventMetrics struct { - res chan *metricsData -} - -func (programEventMetrics) isProgramEvent() {} - -type programEventClientNew struct { - nconn net.Conn -} - -func (programEventClientNew) isProgramEvent() {} - -type programEventClientClose struct { - client *client -} - -func (programEventClientClose) isProgramEvent() {} - -type programEventClientDescribe struct { - client *client - pathName string -} - -func (programEventClientDescribe) isProgramEvent() {} - -type programEventClientAnnounce struct { - res chan error - client *client - pathName string - sdpText []byte - sdpParsed *sdp.SessionDescription -} - -func (programEventClientAnnounce) isProgramEvent() {} - -type programEventClientSetupPlay struct { - res chan error - client *client - pathName string - trackId int -} - -func (programEventClientSetupPlay) isProgramEvent() {} - -type programEventClientPlay struct { - client *client -} - -func (programEventClientPlay) isProgramEvent() {} - -type programEventClientRecord struct { - client *client -} - -func (programEventClientRecord) isProgramEvent() {} - -type programEventClientFrameUdp struct { - addr *net.UDPAddr - streamType gortsplib.StreamType - buf []byte -} - -func (programEventClientFrameUdp) isProgramEvent() {} - -type programEventClientFrameTcp struct { - path *path - trackId int - streamType gortsplib.StreamType - buf []byte -} - -func (programEventClientFrameTcp) isProgramEvent() {} - -type programEventSourceReady struct { - source *source -} - -func (programEventSourceReady) isProgramEvent() {} - -type programEventSourceNotReady struct { - source *source -} - -func (programEventSourceNotReady) isProgramEvent() {} - -type programEventSourceFrame struct { - source *source - trackId int - streamType gortsplib.StreamType - buf []byte -} - -func (programEventSourceFrame) isProgramEvent() {} - -type programEventTerminate struct{} - -func (programEventTerminate) isProgramEvent() {} - type program struct { conf *conf logFile *os.File @@ -142,8 +39,21 @@ type program struct { publisherCount int readerCount int - events chan programEvent - done chan struct{} + metricsGather chan metricsGatherReq + clientNew chan net.Conn + clientClose chan *client + clientDescribe chan clientDescribeReq + clientAnnounce chan clientAnnounceReq + clientSetupPlay chan clientSetupPlayReq + clientPlay chan *client + clientRecord chan *client + clientFrameUdp chan clientFrameUdpReq + clientFrameTcp chan clientFrameTcpReq + sourceReady chan *source + sourceNotReady chan *source + sourceFrame chan sourceFrameReq + terminate chan struct{} + done chan struct{} } func newProgram(args []string, stdin io.Reader) (*program, error) { @@ -170,7 +80,20 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { paths: make(map[string]*path), clients: make(map[*client]struct{}), udpClientsByAddr: make(map[udpClientAddr]*udpClient), - events: make(chan programEvent), + metricsGather: make(chan metricsGatherReq), + clientNew: make(chan net.Conn), + clientClose: make(chan *client), + clientDescribe: make(chan clientDescribeReq), + clientAnnounce: make(chan clientAnnounceReq), + clientSetupPlay: make(chan clientSetupPlayReq), + clientPlay: make(chan *client), + clientRecord: make(chan *client), + clientFrameUdp: make(chan clientFrameUdpReq), + clientFrameTcp: make(chan clientFrameTcpReq), + sourceReady: make(chan *source), + sourceNotReady: make(chan *source), + sourceFrame: make(chan sourceFrameReq), + terminate: make(chan struct{}), done: make(chan struct{}), } @@ -273,143 +196,155 @@ outer: path.onCheck() } - case rawEvt := <-p.events: - switch evt := rawEvt.(type) { - case programEventMetrics: - evt.res <- &metricsData{ - clientCount: len(p.clients), - publisherCount: p.publisherCount, - readerCount: p.readerCount, - } + case req := <-p.metricsGather: + req.res <- &metricsData{ + clientCount: len(p.clients), + publisherCount: p.publisherCount, + readerCount: p.readerCount, + } - case programEventClientNew: - c := newClient(p, evt.nconn) - p.clients[c] = struct{}{} - c.log("connected") + case conn := <-p.clientNew: + c := newClient(p, conn) + p.clients[c] = struct{}{} + c.log("connected") - case programEventClientClose: - if _, ok := p.clients[evt.client]; !ok { - continue - } - evt.client.close() + case client := <-p.clientClose: + if _, ok := p.clients[client]; !ok { + continue + } + client.close() - case programEventClientDescribe: - // create path if not exist - if _, ok := p.paths[evt.pathName]; !ok { - p.paths[evt.pathName] = newPath(p, evt.pathName, p.findConfForPathName(evt.pathName), false) - } + case req := <-p.clientDescribe: + // create path if not exist + if _, ok := p.paths[req.pathName]; !ok { + p.paths[req.pathName] = newPath(p, req.pathName, p.findConfForPathName(req.pathName), false) + } - p.paths[evt.pathName].onDescribe(evt.client) + p.paths[req.pathName].onDescribe(req.client) - case programEventClientAnnounce: - // create path if not exist - if path, ok := p.paths[evt.pathName]; !ok { - p.paths[evt.pathName] = newPath(p, evt.pathName, p.findConfForPathName(evt.pathName), false) + case req := <-p.clientAnnounce: + // create path if not exist + if path, ok := p.paths[req.pathName]; !ok { + p.paths[req.pathName] = newPath(p, req.pathName, p.findConfForPathName(req.pathName), false) - } else { - if path.publisher != nil { - evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.pathName) - continue - } + } else { + if path.publisher != nil { + req.res <- fmt.Errorf("someone is already publishing on path '%s'", req.pathName) + continue } + } - p.paths[evt.pathName].publisher = evt.client - p.paths[evt.pathName].publisherSdpText = evt.sdpText - p.paths[evt.pathName].publisherSdpParsed = evt.sdpParsed + p.paths[req.pathName].publisher = req.client + p.paths[req.pathName].publisherSdpText = req.sdpText + p.paths[req.pathName].publisherSdpParsed = req.sdpParsed - evt.client.path = p.paths[evt.pathName] - evt.client.state = clientStatePreRecord - evt.res <- nil + req.client.path = p.paths[req.pathName] + req.client.state = clientStatePreRecord + req.res <- nil - case programEventClientSetupPlay: - path, ok := p.paths[evt.pathName] - if !ok || !path.publisherReady { - evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.pathName) - continue - } + case req := <-p.clientSetupPlay: + path, ok := p.paths[req.pathName] + if !ok || !path.publisherReady { + req.res <- fmt.Errorf("no one is publishing on path '%s'", req.pathName) + continue + } - if evt.trackId >= len(path.publisherSdpParsed.MediaDescriptions) { - evt.res <- fmt.Errorf("track %d does not exist", evt.trackId) - continue - } + if req.trackId >= len(path.publisherSdpParsed.MediaDescriptions) { + req.res <- fmt.Errorf("track %d does not exist", req.trackId) + continue + } + + req.client.path = path + req.client.state = clientStatePrePlay + req.res <- nil + + case client := <-p.clientPlay: + p.readerCount += 1 + client.state = clientStatePlay + + case client := <-p.clientRecord: + p.publisherCount += 1 + client.state = clientStateRecord + + if client.streamProtocol == gortsplib.StreamProtocolUdp { + for trackId, track := range client.streamTracks { + key := makeUdpClientAddr(client.ip(), track.rtpPort) + p.udpClientsByAddr[key] = &udpClient{ + client: client, + trackId: trackId, + streamType: gortsplib.StreamTypeRtp, + } - evt.client.path = path - evt.client.state = clientStatePrePlay - evt.res <- nil - - case programEventClientPlay: - p.readerCount += 1 - evt.client.state = clientStatePlay - - case programEventClientRecord: - p.publisherCount += 1 - evt.client.state = clientStateRecord - - if evt.client.streamProtocol == gortsplib.StreamProtocolUdp { - for trackId, track := range evt.client.streamTracks { - key := makeUdpClientAddr(evt.client.ip(), track.rtpPort) - p.udpClientsByAddr[key] = &udpClient{ - client: evt.client, - trackId: trackId, - streamType: gortsplib.StreamTypeRtp, - } - - key = makeUdpClientAddr(evt.client.ip(), track.rtcpPort) - p.udpClientsByAddr[key] = &udpClient{ - client: evt.client, - trackId: trackId, - streamType: gortsplib.StreamTypeRtcp, - } + key = makeUdpClientAddr(client.ip(), track.rtcpPort) + p.udpClientsByAddr[key] = &udpClient{ + client: client, + trackId: trackId, + streamType: gortsplib.StreamTypeRtcp, } } + } - evt.client.path.onPublisherSetReady() + client.path.onPublisherSetReady() - case programEventClientFrameUdp: - pub, ok := p.udpClientsByAddr[makeUdpClientAddr(evt.addr.IP, evt.addr.Port)] - if !ok { - continue - } + case req := <-p.clientFrameUdp: + pub, ok := p.udpClientsByAddr[makeUdpClientAddr(req.addr.IP, req.addr.Port)] + if !ok { + continue + } - // client sent RTP on RTCP port or vice-versa - if pub.streamType != evt.streamType { - continue - } + // client sent RTP on RTCP port or vice-versa + if pub.streamType != req.streamType { + continue + } - pub.client.rtcpReceivers[pub.trackId].OnFrame(evt.streamType, evt.buf) - p.forwardFrame(pub.client.path, pub.trackId, evt.streamType, evt.buf) + pub.client.rtcpReceivers[pub.trackId].OnFrame(req.streamType, req.buf) + p.forwardFrame(pub.client.path, pub.trackId, req.streamType, req.buf) - case programEventClientFrameTcp: - p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) + case req := <-p.clientFrameTcp: + p.forwardFrame(req.path, req.trackId, req.streamType, req.buf) - case programEventSourceReady: - evt.source.log("ready") - evt.source.path.onPublisherSetReady() + case source := <-p.sourceReady: + source.log("ready") + source.path.onPublisherSetReady() - case programEventSourceNotReady: - evt.source.log("not ready") - evt.source.path.onPublisherSetNotReady() + case source := <-p.sourceNotReady: + source.log("not ready") + source.path.onPublisherSetNotReady() - case programEventSourceFrame: - p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf) + case req := <-p.sourceFrame: + p.forwardFrame(req.source.path, req.trackId, req.streamType, req.buf) - case programEventTerminate: - break outer - } + case <-p.terminate: + break outer } } go func() { - for rawEvt := range p.events { - switch evt := rawEvt.(type) { - case programEventMetrics: - evt.res <- nil + for { + select { + case req, ok := <-p.metricsGather: + if !ok { + return + } + req.res <- nil + + case <-p.clientNew: + case <-p.clientClose: + case <-p.clientDescribe: + + case req := <-p.clientAnnounce: + req.res <- fmt.Errorf("terminated") - case programEventClientAnnounce: - evt.res <- fmt.Errorf("terminated") + case req := <-p.clientSetupPlay: + req.res <- fmt.Errorf("terminated") - case programEventClientSetupPlay: - evt.res <- fmt.Errorf("terminated") + case <-p.clientPlay: + case <-p.clientRecord: + case <-p.clientFrameUdp: + case <-p.clientFrameTcp: + case <-p.sourceReady: + case <-p.sourceNotReady: + case <-p.sourceFrame: } } }() @@ -445,12 +380,24 @@ outer: p.logFile.Close() } - close(p.events) + close(p.metricsGather) + close(p.clientNew) + close(p.clientClose) + close(p.clientDescribe) + close(p.clientAnnounce) + close(p.clientSetupPlay) + close(p.clientPlay) + close(p.clientRecord) + close(p.clientFrameUdp) + close(p.clientFrameTcp) + close(p.sourceReady) + close(p.sourceNotReady) + close(p.sourceFrame) close(p.done) } func (p *program) close() { - p.events <- programEventTerminate{} + close(p.terminate) <-p.done } diff --git a/metrics.go b/metrics.go index cd4de271..be83648f 100644 --- a/metrics.go +++ b/metrics.go @@ -19,6 +19,10 @@ type metricsData struct { readerCount int } +type metricsGatherReq struct { + res chan *metricsData +} + type metrics struct { p *program listener net.Listener @@ -61,7 +65,7 @@ func (m *metrics) close() { func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { res := make(chan *metricsData) - m.p.events <- programEventMetrics{res} + m.p.metricsGather <- metricsGatherReq{res} data := <-res if data == nil { diff --git a/server-tcp.go b/server-tcp.go index 8ad4f369..9f666360 100644 --- a/server-tcp.go +++ b/server-tcp.go @@ -5,14 +5,14 @@ import ( ) type serverTcp struct { - p *program - nconn *net.TCPListener + p *program + listener *net.TCPListener done chan struct{} } func newServerTcp(p *program) (*serverTcp, error) { - nconn, err := net.ListenTCP("tcp", &net.TCPAddr{ + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ Port: p.conf.RtspPort, }) if err != nil { @@ -20,9 +20,9 @@ func newServerTcp(p *program) (*serverTcp, error) { } l := &serverTcp{ - p: p, - nconn: nconn, - done: make(chan struct{}), + p: p, + listener: listener, + done: make(chan struct{}), } l.log("opened on :%d", p.conf.RtspPort) @@ -35,18 +35,18 @@ func (l *serverTcp) log(format string, args ...interface{}) { func (l *serverTcp) run() { for { - nconn, err := l.nconn.AcceptTCP() + conn, err := l.listener.AcceptTCP() if err != nil { break } - l.p.events <- programEventClientNew{nconn} + l.p.clientNew <- conn } close(l.done) } func (l *serverTcp) close() { - l.nconn.Close() + l.listener.Close() <-l.done } diff --git a/server-udp.go b/server-udp.go index ea0a19c9..13a44b9b 100644 --- a/server-udp.go +++ b/server-udp.go @@ -14,7 +14,7 @@ type udpAddrBufPair struct { type serverUdp struct { p *program - nconn *net.UDPConn + conn *net.UDPConn streamType gortsplib.StreamType readBuf *multiBuffer @@ -23,7 +23,7 @@ type serverUdp struct { } func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serverUdp, error) { - nconn, err := net.ListenUDP("udp", &net.UDPAddr{ + conn, err := net.ListenUDP("udp", &net.UDPAddr{ Port: port, }) if err != nil { @@ -32,7 +32,7 @@ func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serve l := &serverUdp{ p: p, - nconn: nconn, + conn: conn, streamType: streamType, readBuf: newMultiBuffer(3, clientUdpReadBufferSize), writeChan: make(chan *udpAddrBufPair), @@ -58,19 +58,19 @@ func (l *serverUdp) run() { go func() { defer close(writeDone) for w := range l.writeChan { - l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout)) - l.nconn.WriteTo(w.buf, w.addr) + l.conn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout)) + l.conn.WriteTo(w.buf, w.addr) } }() for { buf := l.readBuf.next() - n, addr, err := l.nconn.ReadFromUDP(buf) + n, addr, err := l.conn.ReadFromUDP(buf) if err != nil { break } - l.p.events <- programEventClientFrameUdp{ + l.p.clientFrameUdp <- clientFrameUdpReq{ addr, l.streamType, buf[:n], @@ -84,7 +84,7 @@ func (l *serverUdp) run() { } func (l *serverUdp) close() { - l.nconn.Close() + l.conn.Close() <-l.done } diff --git a/source.go b/source.go index ec8c5743..51f6b41d 100644 --- a/source.go +++ b/source.go @@ -16,6 +16,13 @@ const ( sourceTcpReadBufferSize = 128 * 1024 ) +type sourceFrameReq struct { + source *source + trackId int + streamType gortsplib.StreamType + buf []byte +} + type sourceState int const ( @@ -226,7 +233,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { return true } - s.p.events <- programEventSourceReady{s} + s.p.sourceReady <- s var wg sync.WaitGroup @@ -245,7 +252,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { break } - s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} + s.p.sourceFrame <- sourceFrameReq{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} } }(trackId, rtpRead) } @@ -265,7 +272,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { break } - s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} + s.p.sourceFrame <- sourceFrameReq{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} } }(trackId, rtcpRead) } @@ -296,7 +303,7 @@ outer: wg.Wait() - s.p.events <- programEventSourceNotReady{s} + s.p.sourceNotReady <- s return ret } @@ -318,7 +325,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { return true } - s.p.events <- programEventSourceReady{s} + s.p.sourceReady <- s frame := &gortsplib.InterleavedFrame{} multiBuf := newMultiBuffer(3, sourceTcpReadBufferSize) @@ -335,7 +342,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { return } - s.p.events <- programEventSourceFrame{s, frame.TrackId, frame.StreamType, frame.Content} + s.p.sourceFrame <- sourceFrameReq{s, frame.TrackId, frame.StreamType, frame.Content} } }() @@ -358,7 +365,7 @@ outer: } } - s.p.events <- programEventSourceNotReady{s} + s.p.sourceNotReady <- s return ret }