15 changed files with 446 additions and 618 deletions
@ -1,385 +0,0 @@ |
|||||||
package core //nolint:dupl
|
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"crypto/tls" |
|
||||||
"encoding/json" |
|
||||||
"net" |
|
||||||
"net/http" |
|
||||||
"net/url" |
|
||||||
"os" |
|
||||||
"testing" |
|
||||||
"time" |
|
||||||
|
|
||||||
"github.com/bluenviron/gortsplib/v4/pkg/format" |
|
||||||
"github.com/stretchr/testify/require" |
|
||||||
|
|
||||||
"github.com/bluenviron/mediamtx/internal/protocols/rtmp" |
|
||||||
"github.com/bluenviron/mediamtx/internal/test" |
|
||||||
) |
|
||||||
|
|
||||||
type testHTTPAuthenticator struct { |
|
||||||
*http.Server |
|
||||||
} |
|
||||||
|
|
||||||
func newTestHTTPAuthenticator(t *testing.T, protocol string, action string) *testHTTPAuthenticator { |
|
||||||
firstReceived := false |
|
||||||
|
|
||||||
ts := &testHTTPAuthenticator{} |
|
||||||
|
|
||||||
ts.Server = &http.Server{ |
|
||||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
|
||||||
require.Equal(t, http.MethodPost, r.Method) |
|
||||||
require.Equal(t, "/auth", r.URL.Path) |
|
||||||
|
|
||||||
var in struct { |
|
||||||
IP string `json:"ip"` |
|
||||||
User string `json:"user"` |
|
||||||
Password string `json:"password"` |
|
||||||
Path string `json:"path"` |
|
||||||
Protocol string `json:"protocol"` |
|
||||||
ID string `json:"id"` |
|
||||||
Action string `json:"action"` |
|
||||||
Query string `json:"query"` |
|
||||||
} |
|
||||||
err := json.NewDecoder(r.Body).Decode(&in) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
var user string |
|
||||||
if action == "publish" { |
|
||||||
user = "testpublisher" |
|
||||||
} else { |
|
||||||
user = "testreader" |
|
||||||
} |
|
||||||
|
|
||||||
if in.IP != "127.0.0.1" || |
|
||||||
in.User != user || |
|
||||||
in.Password != "testpass" || |
|
||||||
in.Path != "teststream" || |
|
||||||
in.Protocol != protocol || |
|
||||||
(firstReceived && in.ID == "") || |
|
||||||
in.Action != action || |
|
||||||
(in.Query != "user=testreader&pass=testpass¶m=value" && |
|
||||||
in.Query != "user=testpublisher&pass=testpass¶m=value" && |
|
||||||
in.Query != "param=value") { |
|
||||||
w.WriteHeader(http.StatusBadRequest) |
|
||||||
return |
|
||||||
} |
|
||||||
|
|
||||||
firstReceived = true |
|
||||||
}), |
|
||||||
} |
|
||||||
|
|
||||||
ln, err := net.Listen("tcp", "127.0.0.1:9120") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
go ts.Server.Serve(ln) |
|
||||||
|
|
||||||
return ts |
|
||||||
} |
|
||||||
|
|
||||||
func (ts *testHTTPAuthenticator) close() { |
|
||||||
ts.Server.Shutdown(context.Background()) |
|
||||||
} |
|
||||||
|
|
||||||
func TestRTMPServer(t *testing.T) { |
|
||||||
for _, encrypt := range []string{ |
|
||||||
"plain", |
|
||||||
"tls", |
|
||||||
} { |
|
||||||
for _, auth := range []string{ |
|
||||||
"none", |
|
||||||
"internal", |
|
||||||
"external", |
|
||||||
} { |
|
||||||
t.Run("encrypt_"+encrypt+"_auth_"+auth, func(t *testing.T) { |
|
||||||
var port string |
|
||||||
var conf string |
|
||||||
|
|
||||||
if encrypt == "plain" { |
|
||||||
port = "1935" |
|
||||||
|
|
||||||
conf = "rtsp: no\n" + |
|
||||||
"hls: no\n" |
|
||||||
} else { |
|
||||||
port = "1936" |
|
||||||
|
|
||||||
serverCertFpath, err := writeTempFile(serverCert) |
|
||||||
require.NoError(t, err) |
|
||||||
defer os.Remove(serverCertFpath) |
|
||||||
|
|
||||||
serverKeyFpath, err := writeTempFile(serverKey) |
|
||||||
require.NoError(t, err) |
|
||||||
defer os.Remove(serverKeyFpath) |
|
||||||
|
|
||||||
conf = "rtsp: no\n" + |
|
||||||
"hls: no\n" + |
|
||||||
"webrtc: no\n" + |
|
||||||
"rtmpEncryption: \"yes\"\n" + |
|
||||||
"rtmpServerCert: " + serverCertFpath + "\n" + |
|
||||||
"rtmpServerKey: " + serverKeyFpath + "\n" |
|
||||||
} |
|
||||||
|
|
||||||
switch auth { |
|
||||||
case "none": |
|
||||||
conf += "paths:\n" + |
|
||||||
" all_others:\n" |
|
||||||
|
|
||||||
case "internal": |
|
||||||
conf += "paths:\n" + |
|
||||||
" all_others:\n" + |
|
||||||
" publishUser: testpublisher\n" + |
|
||||||
" publishPass: testpass\n" + |
|
||||||
" publishIPs: [127.0.0.0/16]\n" + |
|
||||||
" readUser: testreader\n" + |
|
||||||
" readPass: testpass\n" + |
|
||||||
" readIPs: [127.0.0.0/16]\n" |
|
||||||
|
|
||||||
case "external": |
|
||||||
conf += "externalAuthenticationURL: http://localhost:9120/auth\n" + |
|
||||||
"paths:\n" + |
|
||||||
" all_others:\n" |
|
||||||
} |
|
||||||
|
|
||||||
p, ok := newInstance(conf) |
|
||||||
require.Equal(t, true, ok) |
|
||||||
defer p.Close() |
|
||||||
|
|
||||||
var a *testHTTPAuthenticator |
|
||||||
if auth == "external" { |
|
||||||
a = newTestHTTPAuthenticator(t, "rtmp", "publish") |
|
||||||
} |
|
||||||
|
|
||||||
u1, err := url.Parse("rtmp://127.0.0.1:" + port + "/teststream?user=testpublisher&pass=testpass¶m=value") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn1, err := func() (net.Conn, error) { |
|
||||||
if encrypt == "plain" { |
|
||||||
return net.Dial("tcp", u1.Host) |
|
||||||
} |
|
||||||
return tls.Dial("tcp", u1.Host, &tls.Config{InsecureSkipVerify: true}) |
|
||||||
}() |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn1.Close() |
|
||||||
|
|
||||||
conn1, err := rtmp.NewClientConn(nconn1, u1, true) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
w, err := rtmp.NewWriter(conn1, test.FormatH264, test.FormatMPEG4Audio) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond) |
|
||||||
|
|
||||||
if auth == "external" { |
|
||||||
a.close() |
|
||||||
a = newTestHTTPAuthenticator(t, "rtmp", "read") |
|
||||||
defer a.close() |
|
||||||
} |
|
||||||
|
|
||||||
u2, err := url.Parse("rtmp://127.0.0.1:" + port + "/teststream?user=testreader&pass=testpass¶m=value") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn2, err := func() (net.Conn, error) { |
|
||||||
if encrypt == "plain" { |
|
||||||
return net.Dial("tcp", u2.Host) |
|
||||||
} |
|
||||||
return tls.Dial("tcp", u2.Host, &tls.Config{InsecureSkipVerify: true}) |
|
||||||
}() |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn2.Close() |
|
||||||
|
|
||||||
conn2, err := rtmp.NewClientConn(nconn2, u2, false) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
r, err := rtmp.NewReader(conn2) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
videoTrack1, audioTrack2 := r.Tracks() |
|
||||||
require.Equal(t, test.FormatH264, videoTrack1) |
|
||||||
require.Equal(t, test.FormatMPEG4Audio, audioTrack2) |
|
||||||
|
|
||||||
err = w.WriteH264(0, 0, true, [][]byte{ |
|
||||||
{0x05, 0x02, 0x03, 0x04}, // IDR 1
|
|
||||||
{0x05, 0x02, 0x03, 0x04}, // IDR 2
|
|
||||||
}) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
r.OnDataH264(func(pts time.Duration, au [][]byte) { |
|
||||||
require.Equal(t, [][]byte{ |
|
||||||
test.FormatH264.SPS, |
|
||||||
test.FormatH264.PPS, |
|
||||||
{ // IDR 1
|
|
||||||
0x05, 0x02, 0x03, 0x04, |
|
||||||
}, |
|
||||||
{ // IDR 2
|
|
||||||
0x05, 0x02, 0x03, 0x04, |
|
||||||
}, |
|
||||||
}, au) |
|
||||||
}) |
|
||||||
|
|
||||||
err = r.Read() |
|
||||||
require.NoError(t, err) |
|
||||||
}) |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func TestRTMPServerAuthFail(t *testing.T) { |
|
||||||
t.Run("publish", func(t *testing.T) { //nolint:dupl
|
|
||||||
p, ok := newInstance("rtsp: no\n" + |
|
||||||
"hls: no\n" + |
|
||||||
"webrtc: no\n" + |
|
||||||
"paths:\n" + |
|
||||||
" all_others:\n" + |
|
||||||
" publishUser: testuser2\n" + |
|
||||||
" publishPass: testpass\n") |
|
||||||
require.Equal(t, true, ok) |
|
||||||
defer p.Close() |
|
||||||
|
|
||||||
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser&pass=testpass") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn1, err := net.Dial("tcp", u1.Host) |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn1.Close() |
|
||||||
|
|
||||||
conn1, err := rtmp.NewClientConn(nconn1, u1, true) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
videoTrack := &format.H264{ |
|
||||||
PayloadTyp: 96, |
|
||||||
SPS: []byte{ |
|
||||||
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0, |
|
||||||
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00, |
|
||||||
0x00, 0x03, 0x00, 0x3d, 0x08, |
|
||||||
}, |
|
||||||
PPS: []byte{ |
|
||||||
0x68, 0xee, 0x3c, 0x80, |
|
||||||
}, |
|
||||||
PacketizationMode: 1, |
|
||||||
} |
|
||||||
|
|
||||||
_, err = rtmp.NewWriter(conn1, videoTrack, nil) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond) |
|
||||||
|
|
||||||
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn2, err := net.Dial("tcp", u2.Host) |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn2.Close() |
|
||||||
|
|
||||||
conn2, err := rtmp.NewClientConn(nconn2, u2, false) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
_, err = rtmp.NewReader(conn2) |
|
||||||
require.EqualError(t, err, "EOF") |
|
||||||
}) |
|
||||||
|
|
||||||
t.Run("publish_external", func(t *testing.T) { |
|
||||||
p, ok := newInstance("externalAuthenticationURL: http://localhost:9120/auth\n" + |
|
||||||
"paths:\n" + |
|
||||||
" all_others:\n") |
|
||||||
require.Equal(t, true, ok) |
|
||||||
defer p.Close() |
|
||||||
|
|
||||||
a := newTestHTTPAuthenticator(t, "rtmp", "publish") |
|
||||||
defer a.close() |
|
||||||
|
|
||||||
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser1&pass=testpass") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn1, err := net.Dial("tcp", u1.Host) |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn1.Close() |
|
||||||
|
|
||||||
conn1, err := rtmp.NewClientConn(nconn1, u1, true) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
videoTrack := &format.H264{ |
|
||||||
PayloadTyp: 96, |
|
||||||
SPS: []byte{ |
|
||||||
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0, |
|
||||||
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00, |
|
||||||
0x00, 0x03, 0x00, 0x3d, 0x08, |
|
||||||
}, |
|
||||||
PPS: []byte{ |
|
||||||
0x68, 0xee, 0x3c, 0x80, |
|
||||||
}, |
|
||||||
PacketizationMode: 1, |
|
||||||
} |
|
||||||
|
|
||||||
_, err = rtmp.NewWriter(conn1, videoTrack, nil) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond) |
|
||||||
|
|
||||||
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn2, err := net.Dial("tcp", u2.Host) |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn2.Close() |
|
||||||
|
|
||||||
conn2, err := rtmp.NewClientConn(nconn2, u2, false) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
_, err = rtmp.NewReader(conn2) |
|
||||||
require.EqualError(t, err, "EOF") |
|
||||||
}) |
|
||||||
|
|
||||||
t.Run("read", func(t *testing.T) { //nolint:dupl
|
|
||||||
p, ok := newInstance("rtsp: no\n" + |
|
||||||
"hls: no\n" + |
|
||||||
"webrtc: no\n" + |
|
||||||
"paths:\n" + |
|
||||||
" all_others:\n" + |
|
||||||
" readUser: testuser2\n" + |
|
||||||
" readPass: testpass\n") |
|
||||||
require.Equal(t, true, ok) |
|
||||||
defer p.Close() |
|
||||||
|
|
||||||
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn1, err := net.Dial("tcp", u1.Host) |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn1.Close() |
|
||||||
|
|
||||||
conn1, err := rtmp.NewClientConn(nconn1, u1, true) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
videoTrack := &format.H264{ |
|
||||||
PayloadTyp: 96, |
|
||||||
SPS: []byte{ |
|
||||||
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0, |
|
||||||
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00, |
|
||||||
0x00, 0x03, 0x00, 0x3d, 0x08, |
|
||||||
}, |
|
||||||
PPS: []byte{ |
|
||||||
0x68, 0xee, 0x3c, 0x80, |
|
||||||
}, |
|
||||||
PacketizationMode: 1, |
|
||||||
} |
|
||||||
|
|
||||||
_, err = rtmp.NewWriter(conn1, videoTrack, nil) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond) |
|
||||||
|
|
||||||
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser1&pass=testpass") |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
nconn2, err := net.Dial("tcp", u2.Host) |
|
||||||
require.NoError(t, err) |
|
||||||
defer nconn2.Close() |
|
||||||
|
|
||||||
conn2, err := rtmp.NewClientConn(nconn2, u2, false) |
|
||||||
require.NoError(t, err) |
|
||||||
|
|
||||||
_, err = rtmp.NewReader(conn2) |
|
||||||
require.EqualError(t, err, "EOF") |
|
||||||
}) |
|
||||||
} |
|
||||||
@ -0,0 +1,290 @@ |
|||||||
|
package rtmp |
||||||
|
|
||||||
|
import ( |
||||||
|
"crypto/tls" |
||||||
|
"net" |
||||||
|
"net/url" |
||||||
|
"os" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/bluenviron/gortsplib/v4/pkg/description" |
||||||
|
"github.com/bluenviron/gortsplib/v4/pkg/format" |
||||||
|
"github.com/bluenviron/mediamtx/internal/asyncwriter" |
||||||
|
"github.com/bluenviron/mediamtx/internal/conf" |
||||||
|
"github.com/bluenviron/mediamtx/internal/defs" |
||||||
|
"github.com/bluenviron/mediamtx/internal/externalcmd" |
||||||
|
"github.com/bluenviron/mediamtx/internal/protocols/rtmp" |
||||||
|
"github.com/bluenviron/mediamtx/internal/stream" |
||||||
|
"github.com/bluenviron/mediamtx/internal/test" |
||||||
|
"github.com/bluenviron/mediamtx/internal/unit" |
||||||
|
"github.com/stretchr/testify/require" |
||||||
|
) |
||||||
|
|
||||||
|
func writeTempFile(byts []byte) (string, error) { |
||||||
|
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") |
||||||
|
if err != nil { |
||||||
|
return "", err |
||||||
|
} |
||||||
|
defer tmpf.Close() |
||||||
|
|
||||||
|
_, err = tmpf.Write(byts) |
||||||
|
if err != nil { |
||||||
|
return "", err |
||||||
|
} |
||||||
|
|
||||||
|
return tmpf.Name(), nil |
||||||
|
} |
||||||
|
|
||||||
|
type dummyPath struct { |
||||||
|
stream *stream.Stream |
||||||
|
streamCreated chan struct{} |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) Name() string { |
||||||
|
return "teststream" |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) SafeConf() *conf.Path { |
||||||
|
return &conf.Path{} |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { |
||||||
|
return externalcmd.Environment{} |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { |
||||||
|
var err error |
||||||
|
p.stream, err = stream.New( |
||||||
|
1460, |
||||||
|
req.Desc, |
||||||
|
true, |
||||||
|
test.NilLogger{}, |
||||||
|
) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
close(p.streamCreated) |
||||||
|
return p.stream, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) { |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) { |
||||||
|
} |
||||||
|
|
||||||
|
func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) { |
||||||
|
} |
||||||
|
|
||||||
|
type dummyPathManager struct { |
||||||
|
path *dummyPath |
||||||
|
} |
||||||
|
|
||||||
|
func (pm *dummyPathManager) AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) { |
||||||
|
return pm.path, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (pm *dummyPathManager) AddReader(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { |
||||||
|
return pm.path, pm.path.stream, nil |
||||||
|
} |
||||||
|
|
||||||
|
func TestServerPublish(t *testing.T) { |
||||||
|
for _, encrypt := range []string{ |
||||||
|
"plain", |
||||||
|
"tls", |
||||||
|
} { |
||||||
|
t.Run(encrypt, func(t *testing.T) { |
||||||
|
var serverCertFpath string |
||||||
|
var serverKeyFpath string |
||||||
|
|
||||||
|
if encrypt == "tls" { |
||||||
|
var err error |
||||||
|
serverCertFpath, err = writeTempFile(test.TLSCertPub) |
||||||
|
require.NoError(t, err) |
||||||
|
defer os.Remove(serverCertFpath) |
||||||
|
|
||||||
|
serverKeyFpath, err = writeTempFile(test.TLSCertKey) |
||||||
|
require.NoError(t, err) |
||||||
|
defer os.Remove(serverKeyFpath) |
||||||
|
} |
||||||
|
|
||||||
|
path := &dummyPath{ |
||||||
|
streamCreated: make(chan struct{}), |
||||||
|
} |
||||||
|
|
||||||
|
pathManager := &dummyPathManager{path: path} |
||||||
|
|
||||||
|
s := &Server{ |
||||||
|
Address: "127.0.0.1:1935", |
||||||
|
ReadTimeout: conf.StringDuration(10 * time.Second), |
||||||
|
WriteTimeout: conf.StringDuration(10 * time.Second), |
||||||
|
WriteQueueSize: 512, |
||||||
|
IsTLS: encrypt == "tls", |
||||||
|
ServerCert: serverCertFpath, |
||||||
|
ServerKey: serverKeyFpath, |
||||||
|
RTSPAddress: "", |
||||||
|
RunOnConnect: "", |
||||||
|
RunOnConnectRestart: false, |
||||||
|
RunOnDisconnect: "", |
||||||
|
ExternalCmdPool: nil, |
||||||
|
PathManager: pathManager, |
||||||
|
Parent: &test.NilLogger{}, |
||||||
|
} |
||||||
|
err := s.Initialize() |
||||||
|
require.NoError(t, err) |
||||||
|
defer s.Close() |
||||||
|
|
||||||
|
u, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testpublisher&pass=testpass¶m=value") |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
nconn, err := func() (net.Conn, error) { |
||||||
|
if encrypt == "plain" { |
||||||
|
return net.Dial("tcp", u.Host) |
||||||
|
} |
||||||
|
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) |
||||||
|
}() |
||||||
|
require.NoError(t, err) |
||||||
|
defer nconn.Close() |
||||||
|
|
||||||
|
conn, err := rtmp.NewClientConn(nconn, u, true) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
w, err := rtmp.NewWriter(conn, test.FormatH264, test.FormatMPEG4Audio) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
<-path.streamCreated |
||||||
|
|
||||||
|
aw := asyncwriter.New(512, &test.NilLogger{}) |
||||||
|
|
||||||
|
recv := make(chan struct{}) |
||||||
|
|
||||||
|
path.stream.AddReader(aw, |
||||||
|
path.stream.Desc().Medias[0], |
||||||
|
path.stream.Desc().Medias[0].Formats[0], |
||||||
|
func(u unit.Unit) error { |
||||||
|
require.Equal(t, [][]byte{ |
||||||
|
test.FormatH264.SPS, |
||||||
|
test.FormatH264.PPS, |
||||||
|
{5, 2, 3, 4}, |
||||||
|
}, u.(*unit.H264).AU) |
||||||
|
close(recv) |
||||||
|
return nil |
||||||
|
}) |
||||||
|
|
||||||
|
err = w.WriteH264(0, 0, true, [][]byte{ |
||||||
|
{5, 2, 3, 4}, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
aw.Start() |
||||||
|
|
||||||
|
<-recv |
||||||
|
|
||||||
|
aw.Stop() |
||||||
|
}) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func TestServerRead(t *testing.T) { |
||||||
|
for _, encrypt := range []string{ |
||||||
|
"plain", |
||||||
|
"tls", |
||||||
|
} { |
||||||
|
t.Run(encrypt, func(t *testing.T) { |
||||||
|
var serverCertFpath string |
||||||
|
var serverKeyFpath string |
||||||
|
|
||||||
|
if encrypt == "tls" { |
||||||
|
var err error |
||||||
|
serverCertFpath, err = writeTempFile(test.TLSCertPub) |
||||||
|
require.NoError(t, err) |
||||||
|
defer os.Remove(serverCertFpath) |
||||||
|
|
||||||
|
serverKeyFpath, err = writeTempFile(test.TLSCertKey) |
||||||
|
require.NoError(t, err) |
||||||
|
defer os.Remove(serverKeyFpath) |
||||||
|
} |
||||||
|
|
||||||
|
testMediaH264 := &description.Media{ |
||||||
|
Type: description.MediaTypeVideo, |
||||||
|
Formats: []format.Format{test.FormatH264}, |
||||||
|
} |
||||||
|
|
||||||
|
desc := &description.Session{Medias: []*description.Media{testMediaH264}} |
||||||
|
|
||||||
|
stream, err := stream.New( |
||||||
|
1460, |
||||||
|
desc, |
||||||
|
true, |
||||||
|
test.NilLogger{}, |
||||||
|
) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
path := &dummyPath{stream: stream} |
||||||
|
|
||||||
|
pathManager := &dummyPathManager{path: path} |
||||||
|
|
||||||
|
s := &Server{ |
||||||
|
Address: "127.0.0.1:1935", |
||||||
|
ReadTimeout: conf.StringDuration(10 * time.Second), |
||||||
|
WriteTimeout: conf.StringDuration(10 * time.Second), |
||||||
|
WriteQueueSize: 512, |
||||||
|
IsTLS: encrypt == "tls", |
||||||
|
ServerCert: serverCertFpath, |
||||||
|
ServerKey: serverKeyFpath, |
||||||
|
RTSPAddress: "", |
||||||
|
RunOnConnect: "", |
||||||
|
RunOnConnectRestart: false, |
||||||
|
RunOnDisconnect: "", |
||||||
|
ExternalCmdPool: nil, |
||||||
|
PathManager: pathManager, |
||||||
|
Parent: &test.NilLogger{}, |
||||||
|
} |
||||||
|
err = s.Initialize() |
||||||
|
require.NoError(t, err) |
||||||
|
defer s.Close() |
||||||
|
|
||||||
|
u, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testreader&pass=testpass¶m=value") |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
nconn, err := func() (net.Conn, error) { |
||||||
|
if encrypt == "plain" { |
||||||
|
return net.Dial("tcp", u.Host) |
||||||
|
} |
||||||
|
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) |
||||||
|
}() |
||||||
|
require.NoError(t, err) |
||||||
|
defer nconn.Close() |
||||||
|
|
||||||
|
conn, err := rtmp.NewClientConn(nconn, u, false) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
r, err := rtmp.NewReader(conn) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
videoTrack, _ := r.Tracks() |
||||||
|
require.Equal(t, test.FormatH264, videoTrack) |
||||||
|
|
||||||
|
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ |
||||||
|
Base: unit.Base{ |
||||||
|
NTP: time.Time{}, |
||||||
|
}, |
||||||
|
AU: [][]byte{ |
||||||
|
{5, 2, 3, 4}, // IDR
|
||||||
|
}, |
||||||
|
}) |
||||||
|
|
||||||
|
r.OnDataH264(func(pts time.Duration, au [][]byte) { |
||||||
|
require.Equal(t, [][]byte{ |
||||||
|
test.FormatH264.SPS, |
||||||
|
test.FormatH264.PPS, |
||||||
|
{5, 2, 3, 4}, |
||||||
|
}, au) |
||||||
|
}) |
||||||
|
|
||||||
|
err = r.Read() |
||||||
|
require.NoError(t, err) |
||||||
|
}) |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,55 @@ |
|||||||
|
package test |
||||||
|
|
||||||
|
// TLSCertPub is the public key of a test certificate.
|
||||||
|
var TLSCertPub = []byte(`-----BEGIN CERTIFICATE----- |
||||||
|
MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL |
||||||
|
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM |
||||||
|
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy |
||||||
|
MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw |
||||||
|
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB |
||||||
|
AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj |
||||||
|
zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv |
||||||
|
NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp |
||||||
|
OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I |
||||||
|
qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e |
||||||
|
nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud |
||||||
|
DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a |
||||||
|
u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj |
||||||
|
3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO |
||||||
|
xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu |
||||||
|
tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI |
||||||
|
XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 |
||||||
|
7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd |
||||||
|
XQxaORfgM//NzX9LhUPk
|
||||||
|
-----END CERTIFICATE----- |
||||||
|
`) |
||||||
|
|
||||||
|
// TLSCertKey is the private key of a test certificate.
|
||||||
|
var TLSCertKey = []byte(`-----BEGIN RSA PRIVATE KEY----- |
||||||
|
MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ |
||||||
|
KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y |
||||||
|
1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY |
||||||
|
cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 |
||||||
|
6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE |
||||||
|
CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC |
||||||
|
kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT |
||||||
|
kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP |
||||||
|
bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S |
||||||
|
Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj |
||||||
|
5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb |
||||||
|
agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ |
||||||
|
M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 |
||||||
|
ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz |
||||||
|
ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl |
||||||
|
+jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX |
||||||
|
4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp |
||||||
|
xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj |
||||||
|
7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf |
||||||
|
3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a |
||||||
|
r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO |
||||||
|
y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD |
||||||
|
94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK |
||||||
|
6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 |
||||||
|
+3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= |
||||||
|
-----END RSA PRIVATE KEY----- |
||||||
|
`) |
||||||
Loading…
Reference in new issue