Browse Source

webrtc: fix race condition that caused random crashes during handshake (#2072)

pull/2073/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
5066ba403c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      internal/core/webrtc_manager.go
  2. 87
      internal/core/webrtc_session.go

14
internal/core/webrtc_manager.go

@ -269,7 +269,7 @@ outer: @@ -269,7 +269,7 @@ outer:
sx := newWebRTCSession(
m.ctx,
m.readBufferCount,
req,
req.remoteAddr,
&wg,
m.iceHostNAT1To1IPs,
m.iceUDPMux,
@ -385,15 +385,9 @@ func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes @@ -385,15 +385,9 @@ func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes
select {
case m.chSessionNew <- req:
res1 := <-req.res
select {
case res2 := <-req.res:
return res2
res := <-req.res
case <-res1.sx.ctx.Done():
return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
}
return res.sx.new(req)
case <-m.ctx.Done():
return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
@ -420,7 +414,7 @@ func (m *webRTCManager) sessionAddCandidates( @@ -420,7 +414,7 @@ func (m *webRTCManager) sessionAddCandidates(
return res1
}
return res1.sx.addRemoteCandidates(req)
return res1.sx.addCandidates(req)
case <-m.ctx.Done():
return webRTCSessionAddCandidatesRes{err: fmt.Errorf("terminated")}

87
internal/core/webrtc_session.go

@ -113,7 +113,6 @@ type webRTCSessionPathManager interface { @@ -113,7 +113,6 @@ type webRTCSessionPathManager interface {
type webRTCSession struct {
readBufferCount int
req webRTCSessionNewReq
wg *sync.WaitGroup
iceHostNAT1To1IPs []string
iceUDPMux ice.UDPMux
@ -126,17 +125,19 @@ type webRTCSession struct { @@ -126,17 +125,19 @@ type webRTCSession struct {
created time.Time
uuid uuid.UUID
secret uuid.UUID
req webRTCSessionNewReq
answerSent bool
mutex sync.RWMutex
pc *peerConnection
chAddRemoteCandidates chan webRTCSessionAddCandidatesReq
chNew chan webRTCSessionNewReq
chAddCandidates chan webRTCSessionAddCandidatesReq
}
func newWebRTCSession(
parentCtx context.Context,
readBufferCount int,
req webRTCSessionNewReq,
remoteAddr string,
wg *sync.WaitGroup,
iceHostNAT1To1IPs []string,
iceUDPMux ice.UDPMux,
@ -147,23 +148,23 @@ func newWebRTCSession( @@ -147,23 +148,23 @@ func newWebRTCSession(
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &webRTCSession{
readBufferCount: readBufferCount,
req: req,
wg: wg,
iceHostNAT1To1IPs: iceHostNAT1To1IPs,
iceUDPMux: iceUDPMux,
iceTCPMux: iceTCPMux,
parent: parent,
pathManager: pathManager,
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
uuid: uuid.New(),
secret: uuid.New(),
chAddRemoteCandidates: make(chan webRTCSessionAddCandidatesReq),
}
s.Log(logger.Info, "created by %s", req.remoteAddr)
readBufferCount: readBufferCount,
wg: wg,
iceHostNAT1To1IPs: iceHostNAT1To1IPs,
iceUDPMux: iceUDPMux,
iceTCPMux: iceTCPMux,
parent: parent,
pathManager: pathManager,
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
uuid: uuid.New(),
secret: uuid.New(),
chNew: make(chan webRTCSessionNewReq),
chAddCandidates: make(chan webRTCSessionAddCandidatesReq),
}
s.Log(logger.Info, "created by %s", remoteAddr)
wg.Add(1)
go s.run()
@ -183,7 +184,25 @@ func (s *webRTCSession) close() { @@ -183,7 +184,25 @@ func (s *webRTCSession) close() {
func (s *webRTCSession) run() {
defer s.wg.Done()
errStatusCode, err := s.runInner()
err := s.runInner()
s.ctxCancel()
s.parent.sessionClose(s)
s.Log(logger.Info, "closed (%v)", err)
}
func (s *webRTCSession) runInner() error {
select {
case req := <-s.chNew:
s.req = req
case <-s.ctx.Done():
return fmt.Errorf("terminated")
}
errStatusCode, err := s.runInner2()
if !s.answerSent {
select {
@ -195,14 +214,10 @@ func (s *webRTCSession) run() { @@ -195,14 +214,10 @@ func (s *webRTCSession) run() {
}
}
s.ctxCancel()
s.parent.sessionClose(s)
s.Log(logger.Info, "closed (%v)", err)
return err
}
func (s *webRTCSession) runInner() (int, error) {
func (s *webRTCSession) runInner2() (int, error) {
if s.req.publish {
return s.runPublish()
}
@ -495,7 +510,7 @@ outer: @@ -495,7 +510,7 @@ outer:
func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) {
for {
select {
case req := <-s.chAddRemoteCandidates:
case req := <-s.chAddCandidates:
for _, candidate := range req.candidates {
err := pc.AddICECandidate(*candidate)
if err != nil {
@ -510,11 +525,23 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { @@ -510,11 +525,23 @@ func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) {
}
}
func (s *webRTCSession) addRemoteCandidates(
// new is called by webRTCHTTPServer through webRTCManager.
func (s *webRTCSession) new(req webRTCSessionNewReq) webRTCSessionNewRes {
select {
case s.chNew <- req:
return <-req.res
case <-s.ctx.Done():
return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
}
}
// addCandidates is called by webRTCHTTPServer through webRTCManager.
func (s *webRTCSession) addCandidates(
req webRTCSessionAddCandidatesReq,
) webRTCSessionAddCandidatesRes {
select {
case s.chAddRemoteCandidates <- req:
case s.chAddCandidates <- req:
return <-req.res
case <-s.ctx.Done():

Loading…
Cancel
Save