diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index 667dc3ba..79684b0f 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -269,7 +269,7 @@ outer: sx := newWebRTCSession( m.ctx, m.readBufferCount, - req, + req.remoteAddr, &wg, m.iceHostNAT1To1IPs, m.iceUDPMux, @@ -385,15 +385,9 @@ func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes select { case m.chSessionNew <- req: - res1 := <-req.res - - select { - case res2 := <-req.res: - return res2 + res := <-req.res - case <-res1.sx.ctx.Done(): - return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} - } + return res.sx.new(req) case <-m.ctx.Done(): return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} @@ -420,7 +414,7 @@ func (m *webRTCManager) sessionAddCandidates( return res1 } - return res1.sx.addRemoteCandidates(req) + return res1.sx.addCandidates(req) case <-m.ctx.Done(): return webRTCSessionAddCandidatesRes{err: fmt.Errorf("terminated")} diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index d13af765..f0ce719e 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -113,7 +113,6 @@ type webRTCSessionPathManager interface { type webRTCSession struct { readBufferCount int - req webRTCSessionNewReq wg *sync.WaitGroup iceHostNAT1To1IPs []string iceUDPMux ice.UDPMux @@ -126,17 +125,19 @@ type webRTCSession struct { created time.Time uuid uuid.UUID secret uuid.UUID + req webRTCSessionNewReq answerSent bool mutex sync.RWMutex pc *peerConnection - chAddRemoteCandidates chan webRTCSessionAddCandidatesReq + chNew chan webRTCSessionNewReq + chAddCandidates chan webRTCSessionAddCandidatesReq } func newWebRTCSession( parentCtx context.Context, readBufferCount int, - req webRTCSessionNewReq, + remoteAddr string, wg *sync.WaitGroup, iceHostNAT1To1IPs []string, iceUDPMux ice.UDPMux, @@ -147,23 +148,23 @@ func newWebRTCSession( ctx, ctxCancel := context.WithCancel(parentCtx) s := &webRTCSession{ - readBufferCount: readBufferCount, - req: req, - wg: wg, - iceHostNAT1To1IPs: iceHostNAT1To1IPs, - iceUDPMux: iceUDPMux, - iceTCPMux: iceTCPMux, - parent: parent, - pathManager: pathManager, - ctx: ctx, - ctxCancel: ctxCancel, - created: time.Now(), - uuid: uuid.New(), - secret: uuid.New(), - chAddRemoteCandidates: make(chan webRTCSessionAddCandidatesReq), - } - - s.Log(logger.Info, "created by %s", req.remoteAddr) + readBufferCount: readBufferCount, + wg: wg, + iceHostNAT1To1IPs: iceHostNAT1To1IPs, + iceUDPMux: iceUDPMux, + iceTCPMux: iceTCPMux, + parent: parent, + pathManager: pathManager, + ctx: ctx, + ctxCancel: ctxCancel, + created: time.Now(), + uuid: uuid.New(), + secret: uuid.New(), + chNew: make(chan webRTCSessionNewReq), + chAddCandidates: make(chan webRTCSessionAddCandidatesReq), + } + + s.Log(logger.Info, "created by %s", remoteAddr) wg.Add(1) go s.run() @@ -183,7 +184,25 @@ func (s *webRTCSession) close() { func (s *webRTCSession) run() { defer s.wg.Done() - errStatusCode, err := s.runInner() + err := s.runInner() + + s.ctxCancel() + + s.parent.sessionClose(s) + + s.Log(logger.Info, "closed (%v)", err) +} + +func (s *webRTCSession) runInner() error { + select { + case req := <-s.chNew: + s.req = req + + case <-s.ctx.Done(): + return fmt.Errorf("terminated") + } + + errStatusCode, err := s.runInner2() if !s.answerSent { select { @@ -195,14 +214,10 @@ func (s *webRTCSession) run() { } } - s.ctxCancel() - - s.parent.sessionClose(s) - - s.Log(logger.Info, "closed (%v)", err) + return err } -func (s *webRTCSession) runInner() (int, error) { +func (s *webRTCSession) runInner2() (int, error) { if s.req.publish { return s.runPublish() } @@ -495,7 +510,7 @@ outer: func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { for { select { - case req := <-s.chAddRemoteCandidates: + case req := <-s.chAddCandidates: for _, candidate := range req.candidates { err := pc.AddICECandidate(*candidate) if err != nil { @@ -510,11 +525,23 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { } } -func (s *webRTCSession) addRemoteCandidates( +// new is called by webRTCHTTPServer through webRTCManager. +func (s *webRTCSession) new(req webRTCSessionNewReq) webRTCSessionNewRes { + select { + case s.chNew <- req: + return <-req.res + + case <-s.ctx.Done(): + return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} + } +} + +// addCandidates is called by webRTCHTTPServer through webRTCManager. +func (s *webRTCSession) addCandidates( req webRTCSessionAddCandidatesReq, ) webRTCSessionAddCandidatesRes { select { - case s.chAddRemoteCandidates <- req: + case s.chAddCandidates <- req: return <-req.res case <-s.ctx.Done():