From d33ab5bec6cd2f37b93180aa7bb8407d12f5afc7 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 19 Nov 2020 14:02:31 +0100 Subject: [PATCH] drastically improve performance when reading streams with UDP --- internal/serverudp/server.go | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/internal/serverudp/server.go b/internal/serverudp/server.go index cb708dc7..94efa768 100644 --- a/internal/serverudp/server.go +++ b/internal/serverudp/server.go @@ -24,11 +24,6 @@ type publisherData struct { trackId int } -type bufAddrPair struct { - buf []byte - addr *net.UDPAddr -} - // Parent is implemented by program. type Parent interface { Log(string, ...interface{}) @@ -59,9 +54,7 @@ type Server struct { readBuf *multibuffer.MultiBuffer publishersMutex sync.RWMutex publishers map[publisherAddr]*publisherData - - // in - write chan bufAddrPair + writeMutex sync.Mutex // out done chan struct{} @@ -86,7 +79,6 @@ func New(writeTimeout time.Duration, pc: pc, readBuf: multibuffer.New(2, readBufferSize), publishers: make(map[publisherAddr]*publisherData), - write: make(chan bufAddrPair), done: make(chan struct{}), } @@ -111,15 +103,6 @@ func (s *Server) Close() { func (s *Server) run() { defer close(s.done) - writerDone := make(chan struct{}) - go func() { - defer close(writerDone) - for w := range s.write { - s.pc.SetWriteDeadline(time.Now().Add(s.writeTimeout)) - s.pc.WriteTo(w.buf, w.addr) - } - }() - for { buf := s.readBuf.Next() n, addr, err := s.pc.ReadFromUDP(buf) @@ -142,9 +125,6 @@ func (s *Server) run() { pubData.publisher.OnUdpPublisherFrame(pubData.trackId, s.streamType, buf[:n]) }() } - - close(s.write) - <-writerDone } // Port returns the server local port. @@ -153,8 +133,12 @@ func (s *Server) Port() int { } // Write writes a UDP packet. -func (s *Server) Write(data []byte, addr *net.UDPAddr) { - s.write <- bufAddrPair{data, addr} +func (s *Server) Write(buf []byte, addr *net.UDPAddr) { + s.writeMutex.Lock() + defer s.writeMutex.Unlock() + + s.pc.SetWriteDeadline(time.Now().Add(s.writeTimeout)) + s.pc.WriteTo(buf, addr) } // AddPublisher adds a publisher.