Browse Source

api: add more attributes to WebRTC connections

new attributes: peerConnectionEstablished, localCandidate, remoteCandidate
pull/1364/head
aler9 3 years ago
parent
commit
e7e8d5ce20
  1. 6
      apidocs/openapi.yaml
  2. 38
      internal/core/api_test.go
  3. 6
      internal/core/core.go
  4. 122
      internal/core/webrtc_conn.go
  5. 22
      internal/core/webrtc_server.go
  6. 146
      internal/core/webrtc_server_test.go

6
apidocs/openapi.yaml

@ -534,6 +534,12 @@ components:
type: string type: string
remoteAddr: remoteAddr:
type: string type: string
peerConnectionEstablished:
type: bool
localCandidate:
type: string
remoteCandidate:
type: string
bytesReceived: bytesReceived:
type: number type: number
bytesSent: bytesSent:

38
internal/core/api_test.go

@ -405,6 +405,7 @@ func TestAPIProtocolSpecificList(t *testing.T) {
"rtmp", "rtmp",
"rtmps", "rtmps",
"hls", "hls",
"webrtc",
} { } {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
conf := "api: yes\n" conf := "api: yes\n"
@ -490,7 +491,6 @@ func TestAPIProtocolSpecificList(t *testing.T) {
case "hls": case "hls":
source := gortsplib.Client{} source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath", err := source.StartRecording("rtsp://localhost:8554/mypath",
media.Medias{medi}) media.Medias{medi})
require.NoError(t, err) require.NoError(t, err)
@ -502,6 +502,17 @@ func TestAPIProtocolSpecificList(t *testing.T) {
defer res.Body.Close() defer res.Body.Close()
require.Equal(t, 200, res.StatusCode) require.Equal(t, 200, res.StatusCode)
}() }()
case "webrtc":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c, err := newWebRTCTestClient("ws://localhost:8889/mypath/ws")
require.NoError(t, err)
defer c.close()
} }
switch ca { switch ca {
@ -562,6 +573,31 @@ func TestAPIProtocolSpecificList(t *testing.T) {
s := fmt.Sprintf("^%d-", time.Now().Year()) s := fmt.Sprintf("^%d-", time.Now().Year())
require.Regexp(t, s, out.Items[firstID].Created) require.Regexp(t, s, out.Items[firstID].Created)
require.Regexp(t, s, out.Items[firstID].LastRequest) require.Regexp(t, s, out.Items[firstID].LastRequest)
case "webrtc":
type item struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
var out struct {
Items map[string]item `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/webrtcconns/list", nil, &out)
require.NoError(t, err)
var firstID string
for k := range out.Items {
firstID = k
}
itm := out.Items[firstID]
require.Equal(t, true, itm.PeerConnectionEstablished)
} }
}) })
} }

6
internal/core/core.go

@ -565,7 +565,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closePathManager || closePathManager ||
closeMetrics closeMetrics
closeWebrtcServer := newConf == nil || closeWebRTCServer := newConf == nil ||
newConf.WebRTCDisable != p.conf.WebRTCDisable || newConf.WebRTCDisable != p.conf.WebRTCDisable ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.WebRTCAddress != p.conf.WebRTCAddress || newConf.WebRTCAddress != p.conf.WebRTCAddress ||
@ -590,7 +590,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeRTSPSServer || closeRTSPSServer ||
closeRTMPServer || closeRTMPServer ||
closeHLSServer || closeHLSServer ||
closeWebrtcServer closeWebRTCServer
if newConf == nil && p.confWatcher != nil { if newConf == nil && p.confWatcher != nil {
p.confWatcher.Close() p.confWatcher.Close()
@ -621,7 +621,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.pathManager = nil p.pathManager = nil
} }
if closeWebrtcServer && p.webRTCServer != nil { if closeWebRTCServer && p.webRTCServer != nil {
p.webRTCServer.close() p.webRTCServer.close()
p.webRTCServer = nil p.webRTCServer = nil
} }

122
internal/core/webrtc_conn.go

@ -62,37 +62,6 @@ func newPeerConnection(configuration webrtc.Configuration,
return api.NewPeerConnection(configuration) return api.NewPeerConnection(configuration)
} }
func describeActiveCandidates(pc *webrtc.PeerConnection) (string, string) {
var lcid string
var rcid string
for _, stats := range pc.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
lcid = tstats.LocalCandidateID
rcid = tstats.RemoteCandidateID
break
}
}
var ldesc string
var rdesc string
for _, stats := range pc.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok {
str := tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
if tstats.ID == lcid {
ldesc = str
} else if tstats.ID == rcid {
rdesc = str
}
}
}
return ldesc, rdesc
}
type webRTCTrack struct { type webRTCTrack struct {
media *media.Media media *media.Media
format format.Format format format.Format
@ -187,13 +156,73 @@ func (c *webRTCConn) remoteAddr() net.Addr {
return c.wsconn.RemoteAddr() return c.wsconn.RemoteAddr()
} }
func (c *webRTCConn) peerConnectionEstablished() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.curPC != nil
}
func (c *webRTCConn) localCandidate() string {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.curPC != nil {
var cid string
for _, stats := range c.curPC.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.LocalCandidateID
break
}
}
if cid != "" {
for _, stats := range c.curPC.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
}
return ""
}
func (c *webRTCConn) remoteCandidate() string {
c.mutex.RLock()
defer c.mutex.RUnlock()
if c.curPC != nil {
var cid string
for _, stats := range c.curPC.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.RemoteCandidateID
break
}
}
if cid != "" {
for _, stats := range c.curPC.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
}
return ""
}
func (c *webRTCConn) bytesReceived() uint64 { func (c *webRTCConn) bytesReceived() uint64 {
c.mutex.RLock() c.mutex.RLock()
defer c.mutex.RUnlock() defer c.mutex.RUnlock()
for _, stats := range c.curPC.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok { if c.curPC != nil {
if tstats.ID == "iceTransport" { for _, stats := range c.curPC.GetStats() {
return tstats.BytesReceived if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesReceived
}
} }
} }
} }
@ -203,10 +232,13 @@ func (c *webRTCConn) bytesReceived() uint64 {
func (c *webRTCConn) bytesSent() uint64 { func (c *webRTCConn) bytesSent() uint64 {
c.mutex.RLock() c.mutex.RLock()
defer c.mutex.RUnlock() defer c.mutex.RUnlock()
for _, stats := range c.curPC.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok { if c.curPC != nil {
if tstats.ID == "iceTransport" { for _, stats := range c.curPC.GetStats() {
return tstats.BytesSent if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesSent
}
} }
} }
} }
@ -335,10 +367,6 @@ func (c *webRTCConn) runInner(ctx context.Context) error {
<-pcClosed <-pcClosed
}() }()
c.mutex.Lock()
c.curPC = pc
c.mutex.Unlock()
for _, track := range tracks { for _, track := range tracks {
rtpSender, err := pc.AddTrack(track.webRTCTrack) rtpSender, err := pc.AddTrack(track.webRTCTrack)
if err != nil { if err != nil {
@ -440,8 +468,12 @@ outer:
// in order to allow the other side of the connection // in order to allow the other side of the connection
// to switch to the "connected" state before WebSocket is closed. // to switch to the "connected" state before WebSocket is closed.
ldesc, rdesc := describeActiveCandidates(pc) c.mutex.Lock()
c.log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", ldesc, rdesc) c.curPC = pc
c.mutex.Unlock()
c.log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
c.localCandidate(), c.remoteCandidate())
ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount))
defer ringBuffer.Close() defer ringBuffer.Close()

22
internal/core/webrtc_server.go

@ -32,10 +32,13 @@ var upgrader = websocket.Upgrader{
} }
type webRTCServerAPIConnsListItem struct { type webRTCServerAPIConnsListItem struct {
Created time.Time `json:"created"` Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"` RemoteAddr string `json:"remoteAddr"`
BytesReceived uint64 `json:"bytesReceived"` PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
BytesSent uint64 `json:"bytesSent"` LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
} }
type webRTCServerAPIConnsListData struct { type webRTCServerAPIConnsListData struct {
@ -264,10 +267,13 @@ outer:
for c := range s.conns { for c := range s.conns {
data.Items[c.uuid.String()] = webRTCServerAPIConnsListItem{ data.Items[c.uuid.String()] = webRTCServerAPIConnsListItem{
Created: c.created, Created: c.created,
RemoteAddr: c.remoteAddr().String(), RemoteAddr: c.remoteAddr().String(),
BytesReceived: c.bytesReceived(), PeerConnectionEstablished: c.peerConnectionEstablished(),
BytesSent: c.bytesSent(), LocalCandidate: c.localCandidate(),
RemoteCandidate: c.remoteCandidate(),
BytesReceived: c.bytesReceived(),
BytesSent: c.bytesSent(),
} }
} }

146
internal/core/webrtc_server_test.go

@ -14,49 +14,43 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestWebRTCServer(t *testing.T) { type webRTCTestClient struct {
p, ok := newInstance("paths:\n" + wc *websocket.Conn
" all:\n") pc *webrtc.PeerConnection
require.Equal(t, true, ok) track chan *webrtc.TrackRemote
defer p.Close() }
medi := &media.Media{ func newWebRTCTestClient(addr string) (*webRTCTestClient, error) {
Type: media.TypeVideo, wc, _, err := websocket.DefaultDialer.Dial(addr, nil) //nolint:bodyclose
Formats: []format.Format{&format.H264{ if err != nil {
PayloadTyp: 96, return nil, err
PacketizationMode: 1,
}},
} }
v := gortsplib.TransportTCP _, msg, err := wc.ReadMessage()
source := gortsplib.Client{ if err != nil {
Transport: &v, wc.Close()
return nil, err
} }
err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c, _, err := websocket.DefaultDialer.Dial("ws://localhost:8889/stream/ws", nil) //nolint:bodyclose
require.NoError(t, err)
defer c.Close()
_, msg, err := c.ReadMessage()
require.NoError(t, err)
var iceServers []webrtc.ICEServer var iceServers []webrtc.ICEServer
err = json.Unmarshal(msg, &iceServers) err = json.Unmarshal(msg, &iceServers)
require.NoError(t, err) if err != nil {
wc.Close()
return nil, err
}
pc, err := newPeerConnection(webrtc.Configuration{ pc, err := newPeerConnection(webrtc.Configuration{
ICEServers: iceServers, ICEServers: iceServers,
}) })
require.NoError(t, err) if err != nil {
defer pc.Close() wc.Close()
return nil, err
}
pc.OnICECandidate(func(i *webrtc.ICECandidate) { pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil { if i != nil {
enc, _ := json.Marshal(i.ToJSON()) enc, _ := json.Marshal(i.ToJSON())
c.WriteMessage(websocket.TextMessage, enc) wc.WriteMessage(websocket.TextMessage, enc)
} }
}) })
@ -67,39 +61,71 @@ func TestWebRTCServer(t *testing.T) {
} }
}) })
track := make(chan *webrtc.TrackRemote) track := make(chan *webrtc.TrackRemote, 1)
pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) { pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) {
track <- trak track <- trak
}) })
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo) _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
localOffer, err := pc.CreateOffer(nil) localOffer, err := pc.CreateOffer(nil)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
enc, err := json.Marshal(localOffer) enc, err := json.Marshal(localOffer)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = c.WriteMessage(websocket.TextMessage, enc) err = wc.WriteMessage(websocket.TextMessage, enc)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = pc.SetLocalDescription(localOffer) err = pc.SetLocalDescription(localOffer)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
_, msg, err = c.ReadMessage() _, msg, err = wc.ReadMessage()
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
var remoteOffer webrtc.SessionDescription var remoteOffer webrtc.SessionDescription
err = json.Unmarshal(msg, &remoteOffer) err = json.Unmarshal(msg, &remoteOffer)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = pc.SetRemoteDescription(remoteOffer) err = pc.SetRemoteDescription(remoteOffer)
require.NoError(t, err) if err != nil {
wc.Close()
pc.Close()
return nil, err
}
go func() { go func() {
for { for {
_, msg, err := c.ReadMessage() _, msg, err := wc.ReadMessage()
if err != nil { if err != nil {
return return
} }
@ -116,6 +142,44 @@ func TestWebRTCServer(t *testing.T) {
<-connected <-connected
return &webRTCTestClient{
wc: wc,
pc: pc,
track: track,
}, nil
}
func (c *webRTCTestClient) close() {
c.pc.Close()
c.wc.Close()
}
func TestWebRTCServer(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}
v := gortsplib.TransportTCP
source := gortsplib.Client{
Transport: &v,
}
err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c, err := newWebRTCTestClient("ws://localhost:8889/stream/ws")
require.NoError(t, err)
defer c.close()
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
source.WritePacketRTP(medi, &rtp.Packet{ source.WritePacketRTP(medi, &rtp.Packet{
@ -130,7 +194,7 @@ func TestWebRTCServer(t *testing.T) {
Payload: []byte{0x01, 0x02, 0x03, 0x04}, Payload: []byte{0x01, 0x02, 0x03, 0x04},
}) })
trak := <-track trak := <-c.track
pkt, _, err := trak.ReadRTP() pkt, _, err := trak.ReadRTP()
require.NoError(t, err) require.NoError(t, err)

Loading…
Cancel
Save