Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

235 lines
4.3 KiB

package core
import (
"encoding/json"
"sync"
"testing"
"time"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
)
type webRTCTestClient struct {
wc *websocket.Conn
pc *webrtc.PeerConnection
track chan *webrtc.TrackRemote
closed chan struct{}
}
func newWebRTCTestClient(addr string) (*webRTCTestClient, error) {
wc, res, err := websocket.DefaultDialer.Dial(addr, nil)
if err != nil {
return nil, err
}
defer res.Body.Close()
_, msg, err := wc.ReadMessage()
if err != nil {
wc.Close()
return nil, err
}
var iceServers []webrtc.ICEServer
err = json.Unmarshal(msg, &iceServers)
if err != nil {
wc.Close()
return nil, err
}
pc, err := newPeerConnection(webrtc.Configuration{
ICEServers: iceServers,
})
if err != nil {
wc.Close()
return nil, err
}
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
enc, _ := json.Marshal(i.ToJSON())
wc.WriteMessage(websocket.TextMessage, enc)
}
})
connected := make(chan struct{})
closed := make(chan struct{})
var stateChangeMutex sync.Mutex
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
select {
case <-closed:
return
default:
}
switch state {
case webrtc.PeerConnectionStateConnected:
close(connected)
case webrtc.PeerConnectionStateClosed:
close(closed)
}
})
track := make(chan *webrtc.TrackRemote, 1)
pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) {
track <- trak
})
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
localOffer, err := pc.CreateOffer(nil)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
enc, err := json.Marshal(localOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = wc.WriteMessage(websocket.TextMessage, enc)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = pc.SetLocalDescription(localOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
_, msg, err = wc.ReadMessage()
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
var remoteOffer webrtc.SessionDescription
err = json.Unmarshal(msg, &remoteOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = pc.SetRemoteDescription(remoteOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
go func() {
for {
_, msg, err := wc.ReadMessage()
if err != nil {
return
}
var candidate webrtc.ICECandidateInit
err = json.Unmarshal(msg, &candidate)
if err != nil {
return
}
pc.AddICECandidate(candidate)
}
}()
<-connected
return &webRTCTestClient{
wc: wc,
pc: pc,
track: track,
closed: closed,
}, nil
}
func (c *webRTCTestClient) close() {
c.pc.Close()
c.wc.Close()
<-c.closed
}
func TestWebRTCServer(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}
v := gortsplib.TransportTCP
source := gortsplib.Client{
Transport: &v,
}
err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c, err := newWebRTCTestClient("ws://localhost:8889/stream/ws")
require.NoError(t, err)
defer c.close()
time.Sleep(500 * time.Millisecond)
source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
trak := <-c.track
pkt, _, err := trak.ReadRTP()
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 102,
SequenceNumber: pkt.SequenceNumber,
Timestamp: pkt.Timestamp,
SSRC: pkt.SSRC,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, pkt)
}