Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1282 lines
30 KiB

package core
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
srt "github.com/datarhei/gosrt"
"github.com/google/uuid"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
)
var testFormatH264 = &format.H264{
PayloadTyp: 96,
SPS: []byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x08, 0x06, 0x07, 0x08},
PacketizationMode: 1,
}
var testMediaH264 = &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{testFormatH264},
}
var testMediaAAC = &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
}
func checkClose(t *testing.T, closeFunc func() error) {
require.NoError(t, closeFunc())
}
func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in interface{}, out interface{}) {
buf := func() io.Reader {
if in == nil {
return nil
}
byts, err := json.Marshal(in)
require.NoError(t, err)
return bytes.NewBuffer(byts)
}()
req, err := http.NewRequest(method, ur, buf)
require.NoError(t, err)
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
t.Errorf("bad status code: %d", res.StatusCode)
}
if out == nil {
return
}
err = json.NewDecoder(res.Body).Decode(out)
require.NoError(t, err)
}
func checkError(t *testing.T, msg string, body io.Reader) {
var resErr map[string]interface{}
err := json.NewDecoder(body).Decode(&resErr)
require.NoError(t, err)
require.Equal(t, map[string]interface{}{"error": msg}, resErr)
}
func TestAPIPathsList(t *testing.T) {
type pathSource struct {
Type string `json:"type"`
}
type path struct {
Name string `json:"name"`
Source pathSource `json:"source"`
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type pathList struct {
ItemCount int `json:"itemCount"`
PageCount int `json:"pageCount"`
Items []path `json:"items"`
}
t.Run("rtsp session", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n")
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
media0 := testMediaH264
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{
media0,
testMediaAAC,
}})
require.NoError(t, err)
defer source.Close()
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
require.Equal(t, pathList{
ItemCount: 1,
PageCount: 1,
Items: []path{{
Name: "mypath",
Source: pathSource{
Type: "rtspSession",
},
Ready: true,
Tracks: []string{"H264", "MPEG-4 Audio"},
BytesReceived: 17,
}},
}, out)
})
t.Run("rtsps session", func(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("api: yes\n" +
"encryption: optional\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" mypath:\n")
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
source := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err = source.StartRecording("rtsps://localhost:8322/mypath",
&description.Session{Medias: []*description.Media{
testMediaH264,
testMediaAAC,
}})
require.NoError(t, err)
defer source.Close()
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
require.Equal(t, pathList{
ItemCount: 1,
PageCount: 1,
Items: []path{{
Name: "mypath",
Source: pathSource{
Type: "rtspsSession",
},
Ready: true,
Tracks: []string{"H264", "MPEG-4 Audio"},
}},
}, out)
})
t.Run("rtsp source", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n" +
" source: rtsp://127.0.0.1:1234/mypath\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
require.Equal(t, pathList{
ItemCount: 1,
PageCount: 1,
Items: []path{{
Name: "mypath",
Source: pathSource{
Type: "rtspSource",
},
Ready: false,
Tracks: []string{},
}},
}, out)
})
t.Run("rtmp source", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n" +
" source: rtmp://127.0.0.1:1234/mypath\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
require.Equal(t, pathList{
ItemCount: 1,
PageCount: 1,
Items: []path{{
Name: "mypath",
Source: pathSource{
Type: "rtmpSource",
},
Ready: false,
Tracks: []string{},
}},
}, out)
})
t.Run("hls source", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n" +
" source: http://127.0.0.1:1234/mypath\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
require.Equal(t, pathList{
ItemCount: 1,
PageCount: 1,
Items: []path{{
Name: "mypath",
Source: pathSource{
Type: "hlsSource",
},
Ready: false,
Tracks: []string{},
}},
}, out)
})
}
func TestAPIPathsGet(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" all_others:\n")
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
for _, ca := range []string{"ok", "ok-nested", "not found"} {
t.Run(ca, func(t *testing.T) {
type pathSource struct {
Type string `json:"type"`
}
type path struct {
Name string `json:"name"`
Source pathSource `json:"source"`
Ready bool `json:"Ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
var pathName string
switch ca {
case "ok":
pathName = "mypath"
case "ok-nested":
pathName = "my/nested/path"
case "not found":
pathName = "nonexisting"
}
if ca == "ok" || ca == "ok-nested" {
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/"+pathName,
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer source.Close()
var out path
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/get/"+pathName, nil, &out)
require.Equal(t, path{
Name: pathName,
Source: pathSource{
Type: "rtspSession",
},
Ready: true,
Tracks: []string{"H264"},
}, out)
} else {
res, err := hc.Get("http://localhost:9997/v3/paths/get/" + pathName)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNotFound, res.StatusCode)
checkError(t, "path not found", res.Body)
}
})
}
}
func TestAPIProtocolList(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp conns",
"rtsp sessions",
"rtsps conns",
"rtsps sessions",
"rtmp",
"rtmps",
"hls",
"webrtc",
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
medi := testMediaH264
switch ca { //nolint:dupl
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath?key=val",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtsps conns", "rtsps sessions":
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
err := source.StartRecording("rtsps://localhost:8322/mypath?key=val",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtmp", "rtmps":
var port string
if ca == "rtmp" {
port = "1935"
} else {
port = "1936"
}
u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath?key=val")
require.NoError(t, err)
nconn, err := func() (net.Conn, error) {
if ca == "rtmp" {
return net.Dial("tcp", u.Host)
}
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
}()
require.NoError(t, err)
defer nconn.Close()
conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
_, err = rtmp.NewWriter(conn, testFormatH264, nil)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
case "hls":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
go func() {
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
/*source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
testSPS,
0x05,
},
})
[]byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},*/
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
// testSPS,
0x05,
},
})
require.NoError(t, err)
}
}()
func() {
res, err := hc.Get("http://localhost:8888/mypath/index.m3u8")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, 200, res.StatusCode)
}()
case "webrtc":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
u, err := url.Parse("http://localhost:8889/mypath/whep?key=val")
require.NoError(t, err)
go func() {
time.Sleep(500 * time.Millisecond)
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
}()
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
_, err = c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
case "srt":
conf := srt.DefaultConfig()
conf.StreamId = "publish:mypath:::key=val"
conn, err := srt.Dial("srt", "localhost:8890", conf)
require.NoError(t, err)
defer conn.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(conn)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
}
switch ca {
case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps", "srt":
var pa string
switch ca {
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
case "rtmps":
pa = "rtmpsconns"
case "srt":
pa = "srtconns"
}
type item struct {
State string `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
}
var out struct {
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, item{
State: "publish",
Path: "mypath",
Query: "key=val",
}, out.Items[0])
}
case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}
var out struct {
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/hlsmuxers/list", nil, &out)
s := fmt.Sprintf("^%d-", time.Now().Year())
require.Regexp(t, s, out.Items[0].Created)
require.Regexp(t, s, out.Items[0].LastRequest)
case "webrtc":
type item struct {
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
State string `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
}
var out struct {
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/list", nil, &out)
require.Equal(t, item{
PeerConnectionEstablished: true,
State: "read",
Path: "mypath",
Query: "key=val",
}, out.Items[0])
}
})
}
}
func TestAPIProtocolGet(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp conns",
"rtsp sessions",
"rtsps conns",
"rtsps sessions",
"rtmp",
"rtmps",
"hls",
"webrtc",
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
medi := testMediaH264
switch ca { //nolint:dupl
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtsps conns", "rtsps sessions":
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
err := source.StartRecording("rtsps://localhost:8322/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtmp", "rtmps":
var port string
if ca == "rtmp" {
port = "1935"
} else {
port = "1936"
}
u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath")
require.NoError(t, err)
nconn, err := func() (net.Conn, error) {
if ca == "rtmp" {
return net.Dial("tcp", u.Host)
}
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
}()
require.NoError(t, err)
defer nconn.Close()
conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
_, err = rtmp.NewWriter(conn, testFormatH264, nil)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
case "hls":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
go func() {
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
/*source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
testSPS,
0x05,
},
})
[]byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},*/
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
// testSPS,
0x05,
},
})
require.NoError(t, err)
}
}()
func() {
res, err := hc.Get("http://localhost:8888/mypath/index.m3u8")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, 200, res.StatusCode)
}()
case "webrtc":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
u, err := url.Parse("http://localhost:8889/mypath/whep")
require.NoError(t, err)
go func() {
time.Sleep(500 * time.Millisecond)
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
}()
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
_, err = c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
case "srt":
conf := srt.DefaultConfig()
conf.StreamId = "publish:mypath"
conn, err := srt.Dial("srt", "localhost:8890", conf)
require.NoError(t, err)
defer conn.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(conn)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
}
switch ca {
case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps", "srt":
var pa string
switch ca {
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
case "rtmps":
pa = "rtmpsconns"
case "srt":
pa = "srtconns"
}
type item struct {
ID string `json:"id"`
State string `json:"state"`
}
var out1 struct {
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out1)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out1.Items[0].State)
}
var out2 item
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/get/"+out1.Items[0].ID, nil, &out2)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out2.State)
}
case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}
var out item
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/hlsmuxers/get/mypath", nil, &out)
s := fmt.Sprintf("^%d-", time.Now().Year())
require.Regexp(t, s, out.Created)
require.Regexp(t, s, out.LastRequest)
case "webrtc":
type item struct {
ID string `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
var out1 struct {
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/list", nil, &out1)
var out2 item
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/get/"+out1.Items[0].ID, nil, &out2)
require.Equal(t, true, out2.PeerConnectionEstablished)
}
})
}
}
func TestAPIProtocolGetNotFound(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp conns",
"rtsp sessions",
"rtsps conns",
"rtsps sessions",
"rtmp",
"rtmps",
"hls",
"webrtc",
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
var pa string
switch ca {
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
case "rtmps":
pa = "rtmpsconns"
case "hls":
pa = "hlsmuxers"
case "webrtc":
pa = "webrtcsessions"
case "srt":
pa = "srtconns"
}
func() {
req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/"+pa+"/get/"+uuid.New().String(), nil)
require.NoError(t, err)
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNotFound, res.StatusCode)
switch ca {
case "rtsp conns", "rtsps conns", "rtmp", "rtmps", "srt":
checkError(t, "connection not found", res.Body)
case "rtsp sessions", "rtsps sessions", "webrtc":
checkError(t, "session not found", res.Body)
case "hls":
checkError(t, "muxer not found", res.Body)
}
}()
})
}
}
func TestAPIProtocolKick(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp",
"rtsps",
"rtmp",
"webrtc",
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
if ca == "rtsps" {
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
medi := testMediaH264
switch ca {
case "rtsp":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtsps":
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
err := source.StartRecording("rtsps://localhost:8322/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtmp":
u, err := url.Parse("rtmp://localhost:1935/mypath")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()
conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
_, err = rtmp.NewWriter(conn, testFormatH264, nil)
require.NoError(t, err)
case "webrtc":
u, err := url.Parse("http://localhost:8889/mypath/whip")
require.NoError(t, err)
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
_, err = c.Publish(context.Background(), medi.Formats[0], nil)
require.NoError(t, err)
defer func() {
require.Error(t, c.Close())
}()
case "srt":
conf := srt.DefaultConfig()
conf.StreamId = "publish:mypath"
conn, err := srt.Dial("srt", "localhost:8890", conf)
require.NoError(t, err)
defer conn.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(conn)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
}
var pa string
switch ca {
case "rtsp":
pa = "rtspsessions"
case "rtsps":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
case "webrtc":
pa = "webrtcsessions"
case "srt":
pa = "srtconns"
}
var out1 struct {
Items []struct {
ID string `json:"id"`
} `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out1)
httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/"+pa+"/kick/"+out1.Items[0].ID, nil, nil)
var out2 struct {
Items []struct {
ID string `json:"id"`
} `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out2)
require.Equal(t, 0, len(out2.Items))
})
}
}
func TestAPIProtocolKickNotFound(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp",
"rtsps",
"rtmp",
"webrtc",
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
if ca == "rtsps" {
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
var pa string
switch ca {
case "rtsp":
pa = "rtspsessions"
case "rtsps":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
case "webrtc":
pa = "webrtcsessions"
case "srt":
pa = "srtconns"
}
func() {
req, err := http.NewRequest(http.MethodPost, "http://localhost:9997/v3/"+pa+"/kick/"+uuid.New().String(), nil)
require.NoError(t, err)
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNotFound, res.StatusCode)
switch ca {
case "rtsp conns", "rtsps conns", "rtmp", "rtmps", "srt":
checkError(t, "connection not found", res.Body)
case "rtsp sessions", "rtsps sessions", "webrtc":
checkError(t, "session not found", res.Body)
case "hls":
checkError(t, "muxer not found", res.Body)
}
}()
})
}
}