Browse Source

webrtc: fix deadlock when a WHEP source fails (#3062) (#3108) (#3110)

Co-authored-by: Jonathan Martin <jonathan.martin@marss.com>
Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
pull/3120/head
Jonathan Martin 1 year ago committed by GitHub
parent
commit
732bf565bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 20
      internal/protocols/webrtc/peer_connection.go
  2. 33
      internal/protocols/webrtc/peer_connection_test.go
  3. 4
      internal/protocols/webrtc/whip_client.go

20
internal/protocols/webrtc/peer_connection.go

@ -36,9 +36,12 @@ type PeerConnection struct { @@ -36,9 +36,12 @@ type PeerConnection struct {
newLocalCandidate chan *webrtc.ICECandidateInit
connected chan struct{}
disconnected chan struct{}
closed chan struct{}
done chan struct{}
gatheringDone chan struct{}
incomingTrack chan trackRecvPair
ctx context.Context
ctxCancel context.CancelFunc
}
// Start starts the peer connection.
@ -56,10 +59,12 @@ func (co *PeerConnection) Start() error { @@ -56,10 +59,12 @@ func (co *PeerConnection) Start() error {
co.newLocalCandidate = make(chan *webrtc.ICECandidateInit)
co.connected = make(chan struct{})
co.disconnected = make(chan struct{})
co.closed = make(chan struct{})
co.done = make(chan struct{})
co.gatheringDone = make(chan struct{})
co.incomingTrack = make(chan trackRecvPair)
co.ctx, co.ctxCancel = context.WithCancel(context.Background())
if !co.Publish {
_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
@ -80,7 +85,7 @@ func (co *PeerConnection) Start() error { @@ -80,7 +85,7 @@ func (co *PeerConnection) Start() error {
co.wr.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
select {
case co.incomingTrack <- trackRecvPair{track, receiver}:
case <-co.closed:
case <-co.ctx.Done():
}
})
}
@ -90,7 +95,7 @@ func (co *PeerConnection) Start() error { @@ -90,7 +95,7 @@ func (co *PeerConnection) Start() error {
defer co.stateChangeMutex.Unlock()
select {
case <-co.closed:
case <-co.done:
return
default:
}
@ -108,7 +113,7 @@ func (co *PeerConnection) Start() error { @@ -108,7 +113,7 @@ func (co *PeerConnection) Start() error {
close(co.disconnected)
case webrtc.PeerConnectionStateClosed:
close(co.closed)
close(co.done)
}
})
@ -118,7 +123,7 @@ func (co *PeerConnection) Start() error { @@ -118,7 +123,7 @@ func (co *PeerConnection) Start() error {
select {
case co.newLocalCandidate <- &v:
case <-co.connected:
case <-co.closed:
case <-co.ctx.Done():
}
} else {
close(co.gatheringDone)
@ -130,8 +135,9 @@ func (co *PeerConnection) Start() error { @@ -130,8 +135,9 @@ func (co *PeerConnection) Start() error {
// Close closes the connection.
func (co *PeerConnection) Close() {
co.ctxCancel()
co.wr.Close() //nolint:errcheck
<-co.closed
<-co.done
}
// CreatePartialOffer creates a partial offer.

33
internal/protocols/webrtc/peer_connection_test.go

@ -0,0 +1,33 @@ @@ -0,0 +1,33 @@
package webrtc
import (
"testing"
"time"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require"
)
func TestPeerConnectionCloseAfterError(t *testing.T) {
api, err := NewAPI(APIConf{
LocalRandomUDP: true,
IPsFromInterfaces: true,
})
require.NoError(t, err)
pc := &PeerConnection{
API: api,
Publish: false,
Log: test.NilLogger{},
}
err = pc.Start()
require.NoError(t, err)
_, err = pc.CreatePartialOffer()
require.NoError(t, err)
// wait for ICE candidates to be generated
time.Sleep(500 * time.Millisecond)
pc.Close()
}

4
internal/protocols/webrtc/whip_client.go

@ -89,7 +89,7 @@ outer: @@ -89,7 +89,7 @@ outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := WHIPPatchCandidate(context.Background(), c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
err := WHIPPatchCandidate(ctx, c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
if err != nil {
c.pc.Close()
return nil, err
@ -180,7 +180,7 @@ outer: @@ -180,7 +180,7 @@ outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := WHIPPatchCandidate(context.Background(), c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
err := WHIPPatchCandidate(ctx, c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
if err != nil {
c.pc.Close()
return nil, err

Loading…
Cancel
Save