From 9d19ccc837d4eb336cbfe11e2088a2e0f67568c3 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 7 Jan 2023 12:33:28 +0100 Subject: [PATCH] add WebRTC and low-latency HLS tests --- go.mod | 2 +- go.sum | 4 +- internal/core/webrtc_conn.go | 13 +-- internal/core/webrtc_server_test.go | 149 ++++++++++++++++++++++++++++ internal/hls/muxer_test.go | 104 ++++++++++++++++--- 5 files changed, 247 insertions(+), 25 deletions(-) create mode 100644 internal/core/webrtc_server_test.go diff --git a/go.mod b/go.mod index 92d1084a..25c6feac 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( code.cloudfoundry.org/bytefmt v0.0.0 github.com/abema/go-mp4 v0.0.0 - github.com/aler9/gortsplib/v2 v2.0.0-20230106140016-a759ba9d014b + github.com/aler9/gortsplib/v2 v2.0.1 github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.8.1 diff --git a/go.sum b/go.sum index 72d7426f..db8d9a99 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aler9/go-mp4 v0.0.0-20221229200349-f3d01e787968 h1:wU8pLx4dc8bLB+JuVPWuGp+BoMkOabj98a0RmO3gqvw= github.com/aler9/go-mp4 v0.0.0-20221229200349-f3d01e787968/go.mod h1:vPl9t5ZK7K0x68jh12/+ECWBCXoWuIDtNgPtU2f04ws= -github.com/aler9/gortsplib/v2 v2.0.0-20230106140016-a759ba9d014b h1:6Yg4zJ6XowH8dJpSYfyBnp1VR4wOFvCiNdkSdhK+OWQ= -github.com/aler9/gortsplib/v2 v2.0.0-20230106140016-a759ba9d014b/go.mod h1:lMdAxc6daduSzVwh75yQkvH9UHCYHpng/vJ8uXKFzdA= +github.com/aler9/gortsplib/v2 v2.0.1 h1:qhuyclmBdyOcL6vhZg0QAvecStWHg+K8+2G9bzGhyGw= +github.com/aler9/gortsplib/v2 v2.0.1/go.mod h1:lMdAxc6daduSzVwh75yQkvH9UHCYHpng/vJ8uXKFzdA= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/core/webrtc_conn.go b/internal/core/webrtc_conn.go index 67b057f9..a17dad1f 100644 --- a/internal/core/webrtc_conn.go +++ b/internal/core/webrtc_conn.go @@ -33,7 +33,8 @@ import ( ) const ( - handshakeDeadline = 10 * time.Second + webrtcHandshakeDeadline = 10 * time.Second + webrtcPayloadMaxSize = 1200 ) // newPeerConnection creates a PeerConnection with the default codecs and @@ -276,8 +277,8 @@ func (c *webRTCConn) runInner(ctx context.Context) error { } // maximum deadline to complete the handshake - c.wsconn.SetReadDeadline(time.Now().Add(handshakeDeadline)) - c.wsconn.SetWriteDeadline(time.Now().Add(handshakeDeadline)) + c.wsconn.SetReadDeadline(time.Now().Add(webrtcHandshakeDeadline)) + c.wsconn.SetWriteDeadline(time.Now().Add(webrtcHandshakeDeadline)) err = c.writeICEServers(c.genICEServers()) if err != nil { @@ -495,7 +496,7 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) encoder := &rtpvp9.Encoder{ PayloadType: 96, - PayloadMaxSize: 1200, + PayloadMaxSize: webrtcPayloadMaxSize, } encoder.Init() @@ -542,7 +543,7 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) encoder := &rtpvp8.Encoder{ PayloadType: 96, - PayloadMaxSize: 1200, + PayloadMaxSize: webrtcPayloadMaxSize, } encoder.Init() @@ -589,7 +590,7 @@ func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) encoder := &rtph264.Encoder{ PayloadType: 96, - PayloadMaxSize: 1200, + PayloadMaxSize: webrtcPayloadMaxSize, } encoder.Init() diff --git a/internal/core/webrtc_server_test.go b/internal/core/webrtc_server_test.go new file mode 100644 index 00000000..fda3c5f6 --- /dev/null +++ b/internal/core/webrtc_server_test.go @@ -0,0 +1,149 @@ +package core + +import ( + "encoding/json" + "testing" + "time" + + "github.com/aler9/gortsplib/v2" + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/media" + "github.com/gorilla/websocket" + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" +) + +func TestWebRTCServer(t *testing.T) { + p, ok := newInstance("paths:\n" + + " all:\n") + require.Equal(t, true, ok) + defer p.Close() + + medi := &media.Media{ + Type: media.TypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + } + + v := gortsplib.TransportTCP + source := gortsplib.Client{ + Transport: &v, + } + err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + c, _, err := websocket.DefaultDialer.Dial("ws://localhost:8889/stream/ws", nil) //nolint:bodyclose + require.NoError(t, err) + defer c.Close() + + _, msg, err := c.ReadMessage() + require.NoError(t, err) + + var iceServers []webrtc.ICEServer + err = json.Unmarshal(msg, &iceServers) + require.NoError(t, err) + + pc, err := newPeerConnection(webrtc.Configuration{ + ICEServers: iceServers, + }) + require.NoError(t, err) + defer pc.Close() + + pc.OnICECandidate(func(i *webrtc.ICECandidate) { + if i != nil { + enc, _ := json.Marshal(i.ToJSON()) + c.WriteMessage(websocket.TextMessage, enc) + } + }) + + connected := make(chan struct{}) + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + if state == webrtc.PeerConnectionStateConnected { + close(connected) + } + }) + + track := make(chan *webrtc.TrackRemote) + pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) { + track <- trak + }) + + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo) + require.NoError(t, err) + + localOffer, err := pc.CreateOffer(nil) + require.NoError(t, err) + + enc, err := json.Marshal(localOffer) + require.NoError(t, err) + + err = c.WriteMessage(websocket.TextMessage, enc) + require.NoError(t, err) + + err = pc.SetLocalDescription(localOffer) + require.NoError(t, err) + + _, msg, err = c.ReadMessage() + require.NoError(t, err) + + var remoteOffer webrtc.SessionDescription + err = json.Unmarshal(msg, &remoteOffer) + require.NoError(t, err) + + err = pc.SetRemoteDescription(remoteOffer) + require.NoError(t, err) + + go func() { + for { + _, msg, err := c.ReadMessage() + if err != nil { + return + } + + var candidate webrtc.ICECandidateInit + err = json.Unmarshal(msg, &candidate) + if err != nil { + return + } + + pc.AddICECandidate(candidate) + } + }() + + <-connected + + time.Sleep(500 * time.Millisecond) + + source.WritePacketRTP(medi, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }) + + trak := <-track + + pkt, _, err := trak.ReadRTP() + require.NoError(t, err) + require.Equal(t, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 102, + SequenceNumber: pkt.SequenceNumber, + Timestamp: pkt.Timestamp, + SSRC: pkt.SSRC, + CSRC: []uint32{}, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }, pkt) +} diff --git a/internal/hls/muxer_test.go b/internal/hls/muxer_test.go index ac19daf1..0dc819dc 100644 --- a/internal/hls/muxer_test.go +++ b/internal/hls/muxer_test.go @@ -2,6 +2,7 @@ package hls import ( "io" + "net/http" "regexp" "testing" "time" @@ -44,13 +45,19 @@ func TestMuxerVideoAudio(t *testing.T) { for _, ca := range []string{ "mpegts", "fmp4", + "lowLatency", } { t.Run(ca, func(t *testing.T) { var v MuxerVariant - if ca == "mpegts" { + switch ca { + case "mpegts": v = MuxerVariantMPEGTS - } else { + + case "fmp4": v = MuxerVariantFMP4 + + case "lowLatency": + v = MuxerVariantLowLatency } m, err := NewMuxer(v, 3, 1*time.Second, 0, 50*1024*1024, videoTrack, audioTrack) @@ -116,14 +123,16 @@ func TestMuxerVideoAudio(t *testing.T) { byts, err := io.ReadAll(m.File("index.m3u8", "", "", "").Body) require.NoError(t, err) - if ca == "mpegts" { + switch ca { + case "mpegts": require.Equal(t, "#EXTM3U\n"+ "#EXT-X-VERSION:3\n"+ "#EXT-X-INDEPENDENT-SEGMENTS\n"+ "\n"+ "#EXT-X-STREAM-INF:BANDWIDTH=200000,CODECS=\"avc1.42c028,mp4a.40.2\"\n"+ "stream.m3u8\n", string(byts)) - } else { + + case "fmp4", "lowLatency": require.Equal(t, "#EXTM3U\n"+ "#EXT-X-VERSION:9\n"+ "#EXT-X-INDEPENDENT-SEGMENTS\n"+ @@ -135,8 +144,8 @@ func TestMuxerVideoAudio(t *testing.T) { byts, err = io.ReadAll(m.File("stream.m3u8", "", "", "").Body) require.NoError(t, err) - var ma []string - if ca == "mpegts" { + switch ca { + case "mpegts": re := regexp.MustCompile(`^#EXTM3U\n` + `#EXT-X-VERSION:3\n` + `#EXT-X-ALLOW-CACHE:NO\n` + @@ -148,8 +157,15 @@ func TestMuxerVideoAudio(t *testing.T) { `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1,\n` + `(seg1\.ts)\n$`) - ma = re.FindStringSubmatch(string(byts)) - } else { + ma := re.FindStringSubmatch(string(byts)) + require.NotEqual(t, 0, len(ma)) + + seg := m.File(ma[2], "", "", "") + require.Equal(t, http.StatusOK, seg.Status) + _, err := io.ReadAll(seg.Body) + require.NoError(t, err) + + case "fmp4": re := regexp.MustCompile(`^#EXTM3U\n` + `#EXT-X-VERSION:9\n` + `#EXT-X-TARGETDURATION:4\n` + @@ -161,19 +177,75 @@ func TestMuxerVideoAudio(t *testing.T) { `#EXT-X-PROGRAM-DATE-TIME:(.*?)\n` + `#EXTINF:1.00000,\n` + `(seg1\.mp4)\n$`) - ma = re.FindStringSubmatch(string(byts)) - } - require.NotEqual(t, 0, len(ma)) + ma := re.FindStringSubmatch(string(byts)) + require.NotEqual(t, 0, len(ma)) - if ca == "mpegts" { - _, err := io.ReadAll(m.File(ma[2], "", "", "").Body) + init := m.File("init.mp4", "", "", "") + require.Equal(t, http.StatusOK, init.Status) + _, err := io.ReadAll(init.Body) require.NoError(t, err) - } else { - _, err := io.ReadAll(m.File("init.mp4", "", "", "").Body) + + seg := m.File(ma[2], "", "", "") + require.Equal(t, http.StatusOK, seg.Status) + _, err = io.ReadAll(seg.Body) require.NoError(t, err) - _, err = io.ReadAll(m.File(ma[2], "", "", "").Body) + case "lowLatency": + require.Equal(t, + "#EXTM3U\n"+ + "#EXT-X-VERSION:9\n"+ + "#EXT-X-TARGETDURATION:4\n"+ + "#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK=5.00000,CAN-SKIP-UNTIL=24\n"+ + "#EXT-X-PART-INF:PART-TARGET=2\n"+ + "#EXT-X-MEDIA-SEQUENCE:2\n"+ + "#EXT-X-MAP:URI=\"init.mp4\"\n"+ + "#EXT-X-GAP\n"+ + "#EXTINF:4.00000,\n"+ + "gap.mp4\n"+ + "#EXT-X-GAP\n"+ + "#EXTINF:4.00000,\n"+ + "gap.mp4\n"+ + "#EXT-X-GAP\n"+ + "#EXTINF:4.00000,\n"+ + "gap.mp4\n"+ + "#EXT-X-GAP\n"+ + "#EXTINF:4.00000,\n"+ + "gap.mp4\n"+ + "#EXT-X-GAP\n"+ + "#EXTINF:4.00000,\n"+ + "gap.mp4\n"+ + "#EXT-X-PROGRAM-DATE-TIME:2010-01-01T01:01:02Z\n"+ + "#EXT-X-PART:DURATION=2.00000,URI=\"part0.mp4\",INDEPENDENT=YES\n"+ + "#EXT-X-PART:DURATION=2.00000,URI=\"part1.mp4\"\n"+ + "#EXTINF:4.00000,\n"+ + "seg7.mp4\n"+ + "#EXT-X-PROGRAM-DATE-TIME:2010-01-01T01:01:06Z\n"+ + "#EXT-X-PART:DURATION=1.00000,URI=\"part3.mp4\",INDEPENDENT=YES\n"+ + "#EXTINF:1.00000,\n"+ + "seg8.mp4\n"+ + "#EXT-X-PRELOAD-HINT:TYPE=PART,URI=\"part4.mp4\"\n", string(byts)) + + part := m.File("part3.mp4", "", "", "") + require.Equal(t, http.StatusOK, part.Status) + _, err = io.ReadAll(part.Body) require.NoError(t, err) + + recv := make(chan struct{}) + + go func() { + part = m.File("part4.mp4", "", "", "") + _, err := io.ReadAll(part.Body) + require.NoError(t, err) + close(recv) + }() + + d = 9 * time.Second + err = m.WriteH26x(testTime.Add(d-1*time.Second), d, [][]byte{ + {1}, // non-IDR + }) + require.NoError(t, err) + + <-recv } }) }