diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 3b676e0f..293a48df 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -534,6 +534,12 @@ components: type: string remoteAddr: type: string + peerConnectionEstablished: + type: bool + localCandidate: + type: string + remoteCandidate: + type: string bytesReceived: type: number bytesSent: diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 69f0d88a..be0e599d 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -405,6 +405,7 @@ func TestAPIProtocolSpecificList(t *testing.T) { "rtmp", "rtmps", "hls", + "webrtc", } { t.Run(ca, func(t *testing.T) { conf := "api: yes\n" @@ -490,7 +491,6 @@ func TestAPIProtocolSpecificList(t *testing.T) { case "hls": source := gortsplib.Client{} - err := source.StartRecording("rtsp://localhost:8554/mypath", media.Medias{medi}) require.NoError(t, err) @@ -502,6 +502,17 @@ func TestAPIProtocolSpecificList(t *testing.T) { defer res.Body.Close() require.Equal(t, 200, res.StatusCode) }() + + case "webrtc": + source := gortsplib.Client{} + err := source.StartRecording("rtsp://localhost:8554/mypath", + media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + c, err := newWebRTCTestClient("ws://localhost:8889/mypath/ws") + require.NoError(t, err) + defer c.close() } switch ca { @@ -562,6 +573,31 @@ func TestAPIProtocolSpecificList(t *testing.T) { s := fmt.Sprintf("^%d-", time.Now().Year()) require.Regexp(t, s, out.Items[firstID].Created) require.Regexp(t, s, out.Items[firstID].LastRequest) + + case "webrtc": + type item struct { + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + PeerConnectionEstablished bool `json:"peerConnectionEstablished"` + LocalCandidate string `json:"localCandidate"` + RemoteCandidate string `json:"remoteCandidate"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` + } + + var out struct { + Items map[string]item `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v1/webrtcconns/list", nil, &out) + require.NoError(t, err) + + var firstID string + for k := range out.Items { + firstID = k + } + + itm := out.Items[firstID] + require.Equal(t, true, itm.PeerConnectionEstablished) } }) } diff --git a/internal/core/core.go b/internal/core/core.go index 11f1109b..1d1b743b 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -565,7 +565,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { closePathManager || closeMetrics - closeWebrtcServer := newConf == nil || + closeWebRTCServer := newConf == nil || newConf.WebRTCDisable != p.conf.WebRTCDisable || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || newConf.WebRTCAddress != p.conf.WebRTCAddress || @@ -590,7 +590,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { closeRTSPSServer || closeRTMPServer || closeHLSServer || - closeWebrtcServer + closeWebRTCServer if newConf == nil && p.confWatcher != nil { p.confWatcher.Close() @@ -621,7 +621,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { p.pathManager = nil } - if closeWebrtcServer && p.webRTCServer != nil { + if closeWebRTCServer && p.webRTCServer != nil { p.webRTCServer.close() p.webRTCServer = nil } diff --git a/internal/core/webrtc_conn.go b/internal/core/webrtc_conn.go index 7cb491f0..9dc53221 100644 --- a/internal/core/webrtc_conn.go +++ b/internal/core/webrtc_conn.go @@ -62,37 +62,6 @@ func newPeerConnection(configuration webrtc.Configuration, return api.NewPeerConnection(configuration) } -func describeActiveCandidates(pc *webrtc.PeerConnection) (string, string) { - var lcid string - var rcid string - - for _, stats := range pc.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { - lcid = tstats.LocalCandidateID - rcid = tstats.RemoteCandidateID - break - } - } - - var ldesc string - var rdesc string - - for _, stats := range pc.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidateStats); ok { - str := tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + - tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) - - if tstats.ID == lcid { - ldesc = str - } else if tstats.ID == rcid { - rdesc = str - } - } - } - - return ldesc, rdesc -} - type webRTCTrack struct { media *media.Media format format.Format @@ -187,13 +156,73 @@ func (c *webRTCConn) remoteAddr() net.Addr { return c.wsconn.RemoteAddr() } +func (c *webRTCConn) peerConnectionEstablished() bool { + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.curPC != nil +} + +func (c *webRTCConn) localCandidate() string { + c.mutex.RLock() + defer c.mutex.RUnlock() + + if c.curPC != nil { + var cid string + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { + cid = tstats.LocalCandidateID + break + } + } + + if cid != "" { + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { + return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + + tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) + } + } + } + } + return "" +} + +func (c *webRTCConn) remoteCandidate() string { + c.mutex.RLock() + defer c.mutex.RUnlock() + + if c.curPC != nil { + var cid string + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { + cid = tstats.RemoteCandidateID + break + } + } + + if cid != "" { + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { + return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + + tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) + } + } + } + } + return "" +} + func (c *webRTCConn) bytesReceived() uint64 { c.mutex.RLock() defer c.mutex.RUnlock() - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.TransportStats); ok { - if tstats.ID == "iceTransport" { - return tstats.BytesReceived + + if c.curPC != nil { + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesReceived + } } } } @@ -203,10 +232,13 @@ func (c *webRTCConn) bytesReceived() uint64 { func (c *webRTCConn) bytesSent() uint64 { c.mutex.RLock() defer c.mutex.RUnlock() - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.TransportStats); ok { - if tstats.ID == "iceTransport" { - return tstats.BytesSent + + if c.curPC != nil { + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesSent + } } } } @@ -335,10 +367,6 @@ func (c *webRTCConn) runInner(ctx context.Context) error { <-pcClosed }() - c.mutex.Lock() - c.curPC = pc - c.mutex.Unlock() - for _, track := range tracks { rtpSender, err := pc.AddTrack(track.webRTCTrack) if err != nil { @@ -440,8 +468,12 @@ outer: // in order to allow the other side of the connection // to switch to the "connected" state before WebSocket is closed. - ldesc, rdesc := describeActiveCandidates(pc) - c.log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", ldesc, rdesc) + c.mutex.Lock() + c.curPC = pc + c.mutex.Unlock() + + c.log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", + c.localCandidate(), c.remoteCandidate()) ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) defer ringBuffer.Close() diff --git a/internal/core/webrtc_server.go b/internal/core/webrtc_server.go index 12a96820..51b0073b 100644 --- a/internal/core/webrtc_server.go +++ b/internal/core/webrtc_server.go @@ -32,10 +32,13 @@ var upgrader = websocket.Upgrader{ } type webRTCServerAPIConnsListItem struct { - Created time.Time `json:"created"` - RemoteAddr string `json:"remoteAddr"` - BytesReceived uint64 `json:"bytesReceived"` - BytesSent uint64 `json:"bytesSent"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + PeerConnectionEstablished bool `json:"peerConnectionEstablished"` + LocalCandidate string `json:"localCandidate"` + RemoteCandidate string `json:"remoteCandidate"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` } type webRTCServerAPIConnsListData struct { @@ -264,10 +267,13 @@ outer: for c := range s.conns { data.Items[c.uuid.String()] = webRTCServerAPIConnsListItem{ - Created: c.created, - RemoteAddr: c.remoteAddr().String(), - BytesReceived: c.bytesReceived(), - BytesSent: c.bytesSent(), + Created: c.created, + RemoteAddr: c.remoteAddr().String(), + PeerConnectionEstablished: c.peerConnectionEstablished(), + LocalCandidate: c.localCandidate(), + RemoteCandidate: c.remoteCandidate(), + BytesReceived: c.bytesReceived(), + BytesSent: c.bytesSent(), } } diff --git a/internal/core/webrtc_server_test.go b/internal/core/webrtc_server_test.go index fda3c5f6..42bb235c 100644 --- a/internal/core/webrtc_server_test.go +++ b/internal/core/webrtc_server_test.go @@ -14,49 +14,43 @@ import ( "github.com/stretchr/testify/require" ) -func TestWebRTCServer(t *testing.T) { - p, ok := newInstance("paths:\n" + - " all:\n") - require.Equal(t, true, ok) - defer p.Close() +type webRTCTestClient struct { + wc *websocket.Conn + pc *webrtc.PeerConnection + track chan *webrtc.TrackRemote +} - medi := &media.Media{ - Type: media.TypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, +func newWebRTCTestClient(addr string) (*webRTCTestClient, error) { + wc, _, err := websocket.DefaultDialer.Dial(addr, nil) //nolint:bodyclose + if err != nil { + return nil, err } - v := gortsplib.TransportTCP - source := gortsplib.Client{ - Transport: &v, + _, msg, err := wc.ReadMessage() + if err != nil { + wc.Close() + return nil, err } - err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi}) - require.NoError(t, err) - defer source.Close() - - c, _, err := websocket.DefaultDialer.Dial("ws://localhost:8889/stream/ws", nil) //nolint:bodyclose - require.NoError(t, err) - defer c.Close() - - _, msg, err := c.ReadMessage() - require.NoError(t, err) var iceServers []webrtc.ICEServer err = json.Unmarshal(msg, &iceServers) - require.NoError(t, err) + if err != nil { + wc.Close() + return nil, err + } pc, err := newPeerConnection(webrtc.Configuration{ ICEServers: iceServers, }) - require.NoError(t, err) - defer pc.Close() + if err != nil { + wc.Close() + return nil, err + } pc.OnICECandidate(func(i *webrtc.ICECandidate) { if i != nil { enc, _ := json.Marshal(i.ToJSON()) - c.WriteMessage(websocket.TextMessage, enc) + wc.WriteMessage(websocket.TextMessage, enc) } }) @@ -67,39 +61,71 @@ func TestWebRTCServer(t *testing.T) { } }) - track := make(chan *webrtc.TrackRemote) + track := make(chan *webrtc.TrackRemote, 1) pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) { track <- trak }) _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo) - require.NoError(t, err) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } localOffer, err := pc.CreateOffer(nil) - require.NoError(t, err) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } enc, err := json.Marshal(localOffer) - require.NoError(t, err) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } - err = c.WriteMessage(websocket.TextMessage, enc) - require.NoError(t, err) + err = wc.WriteMessage(websocket.TextMessage, enc) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } err = pc.SetLocalDescription(localOffer) - require.NoError(t, err) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } - _, msg, err = c.ReadMessage() - require.NoError(t, err) + _, msg, err = wc.ReadMessage() + if err != nil { + wc.Close() + pc.Close() + return nil, err + } var remoteOffer webrtc.SessionDescription err = json.Unmarshal(msg, &remoteOffer) - require.NoError(t, err) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } err = pc.SetRemoteDescription(remoteOffer) - require.NoError(t, err) + if err != nil { + wc.Close() + pc.Close() + return nil, err + } go func() { for { - _, msg, err := c.ReadMessage() + _, msg, err := wc.ReadMessage() if err != nil { return } @@ -116,6 +142,44 @@ func TestWebRTCServer(t *testing.T) { <-connected + return &webRTCTestClient{ + wc: wc, + pc: pc, + track: track, + }, nil +} + +func (c *webRTCTestClient) close() { + c.pc.Close() + c.wc.Close() +} + +func TestWebRTCServer(t *testing.T) { + p, ok := newInstance("paths:\n" + + " all:\n") + require.Equal(t, true, ok) + defer p.Close() + + medi := &media.Media{ + Type: media.TypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + } + + v := gortsplib.TransportTCP + source := gortsplib.Client{ + Transport: &v, + } + err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + c, err := newWebRTCTestClient("ws://localhost:8889/stream/ws") + require.NoError(t, err) + defer c.close() + time.Sleep(500 * time.Millisecond) source.WritePacketRTP(medi, &rtp.Packet{ @@ -130,7 +194,7 @@ func TestWebRTCServer(t *testing.T) { Payload: []byte{0x01, 0x02, 0x03, 0x04}, }) - trak := <-track + trak := <-c.track pkt, _, err := trak.ReadRTP() require.NoError(t, err)