diff --git a/internal/conf/credential.go b/internal/conf/credential.go index 7b6d4739..14b19cd1 100644 --- a/internal/conf/credential.go +++ b/internal/conf/credential.go @@ -18,14 +18,18 @@ var ( const plainCredentialSupportedChars = "A-Z,0-9,!,$,(,),*,+,.,;,<,=,>,[,],^,_,-,\",\",@,#,&" -// Credential is a parameter that is used as username or password. -type Credential struct { - value string +func sha256Base64(in string) string { + h := sha256.New() + h.Write([]byte(in)) + return base64.StdEncoding.EncodeToString(h.Sum(nil)) } +// Credential is a parameter that is used as username or password. +type Credential string + // MarshalJSON implements json.Marshaler. func (d Credential) MarshalJSON() ([]byte, error) { - return json.Marshal(d.value) + return json.Marshal(string(d)) } // UnmarshalJSON implements json.Unmarshaler. @@ -35,9 +39,7 @@ func (d *Credential) UnmarshalJSON(b []byte) error { return err } - *d = Credential{ - value: in, - } + *d = Credential(in) return d.validate() } @@ -47,72 +49,57 @@ func (d *Credential) UnmarshalEnv(_ string, v string) error { return d.UnmarshalJSON([]byte(`"` + v + `"`)) } -// GetValue returns the value of the credential. -func (d *Credential) GetValue() string { - return d.value -} - -// IsEmpty returns true if the credential is not configured. -func (d *Credential) IsEmpty() bool { - return d.value == "" -} - // IsSha256 returns true if the credential is a sha256 hash. -func (d *Credential) IsSha256() bool { - return d.value != "" && strings.HasPrefix(d.value, "sha256:") +func (d Credential) IsSha256() bool { + return strings.HasPrefix(string(d), "sha256:") } // IsArgon2 returns true if the credential is an argon2 hash. -func (d *Credential) IsArgon2() bool { - return d.value != "" && strings.HasPrefix(d.value, "argon2:") +func (d Credential) IsArgon2() bool { + return strings.HasPrefix(string(d), "argon2:") } // IsHashed returns true if the credential is a sha256 or argon2 hash. -func (d *Credential) IsHashed() bool { +func (d Credential) IsHashed() bool { return d.IsSha256() || d.IsArgon2() } -func sha256Base64(in string) string { - h := sha256.New() - h.Write([]byte(in)) - return base64.StdEncoding.EncodeToString(h.Sum(nil)) -} - // Check returns true if the given value matches the credential. -func (d *Credential) Check(guess string) bool { +func (d Credential) Check(guess string) bool { if d.IsSha256() { - return d.value[len("sha256:"):] == sha256Base64(guess) + return string(d)[len("sha256:"):] == sha256Base64(guess) } + if d.IsArgon2() { // TODO: remove matthewhartstonge/argon2 when this PR gets merged into mainline Go: // https://go-review.googlesource.com/c/crypto/+/502515 - ok, err := argon2.VerifyEncoded([]byte(guess), []byte(d.value[len("argon2:"):])) + ok, err := argon2.VerifyEncoded([]byte(guess), []byte(string(d)[len("argon2:"):])) return ok && err == nil } - if d.IsEmpty() { - // when no credential is set, any value is valid - return true + + if d != "" { + return string(d) == guess } - return d.value == guess + return true } -func (d *Credential) validate() error { - if !d.IsEmpty() { +func (d Credential) validate() error { + if d != "" { switch { case d.IsSha256(): - if !reBase64.MatchString(d.value) { + if !reBase64.MatchString(string(d)) { return fmt.Errorf("credential contains unsupported characters, sha256 hash must be base64 encoded") } case d.IsArgon2(): // TODO: remove matthewhartstonge/argon2 when this PR gets merged into mainline Go: // https://go-review.googlesource.com/c/crypto/+/502515 - _, err := argon2.Decode([]byte(d.value[len("argon2:"):])) + _, err := argon2.Decode([]byte(string(d)[len("argon2:"):])) if err != nil { return fmt.Errorf("invalid argon2 hash: %w", err) } default: - if !rePlainCredential.MatchString(d.value) { + if !rePlainCredential.MatchString(string(d)) { return fmt.Errorf("credential contains unsupported characters. Supported are: %s", plainCredentialSupportedChars) } } diff --git a/internal/conf/credential_test.go b/internal/conf/credential_test.go index ef325672..4c622aaa 100644 --- a/internal/conf/credential_test.go +++ b/internal/conf/credential_test.go @@ -8,7 +8,7 @@ import ( func TestCredential(t *testing.T) { t.Run("MarshalJSON", func(t *testing.T) { - cred := Credential{value: "password"} + cred := Credential("password") expectedJSON := []byte(`"password"`) actualJSON, err := cred.MarshalJSON() assert.NoError(t, err) @@ -16,7 +16,7 @@ func TestCredential(t *testing.T) { }) t.Run("UnmarshalJSON", func(t *testing.T) { - expectedCred := Credential{value: "password"} + expectedCred := Credential("password") jsonData := []byte(`"password"`) var actualCred Credential err := actualCred.UnmarshalJSON(jsonData) @@ -25,79 +25,63 @@ func TestCredential(t *testing.T) { }) t.Run("UnmarshalEnv", func(t *testing.T) { - cred := Credential{} + cred := Credential("") err := cred.UnmarshalEnv("", "password") assert.NoError(t, err) - assert.Equal(t, "password", cred.value) - }) - - t.Run("GetValue", func(t *testing.T) { - cred := Credential{value: "password"} - actualValue := cred.GetValue() - assert.Equal(t, "password", actualValue) - }) - - t.Run("IsEmpty", func(t *testing.T) { - cred := Credential{} - assert.True(t, cred.IsEmpty()) - assert.False(t, cred.IsHashed()) - - cred.value = "password" - assert.False(t, cred.IsEmpty()) - assert.False(t, cred.IsHashed()) + assert.Equal(t, Credential("password"), cred) }) t.Run("IsSha256", func(t *testing.T) { - cred := Credential{} + cred := Credential("") assert.False(t, cred.IsSha256()) assert.False(t, cred.IsHashed()) - cred.value = "sha256:j1tsRqDEw9xvq/D7/9tMx6Jh/jMhk3UfjwIB2f1zgMo=" + cred = "sha256:j1tsRqDEw9xvq/D7/9tMx6Jh/jMhk3UfjwIB2f1zgMo=" assert.True(t, cred.IsSha256()) assert.True(t, cred.IsHashed()) - cred.value = "argon2:$argon2id$v=19$m=65536,t=1," + + cred = "argon2:$argon2id$v=19$m=65536,t=1," + "p=4$WXJGqwIB2qd+pRmxMOw9Dg$X4gvR0ZB2DtQoN8vOnJPR2SeFdUhH9TyVzfV98sfWeE" assert.False(t, cred.IsSha256()) assert.True(t, cred.IsHashed()) }) t.Run("IsArgon2", func(t *testing.T) { - cred := Credential{} + cred := Credential("") assert.False(t, cred.IsArgon2()) assert.False(t, cred.IsHashed()) - cred.value = "sha256:j1tsRqDEw9xvq/D7/9tMx6Jh/jMhk3UfjwIB2f1zgMo=" + cred = "sha256:j1tsRqDEw9xvq/D7/9tMx6Jh/jMhk3UfjwIB2f1zgMo=" assert.False(t, cred.IsArgon2()) assert.True(t, cred.IsHashed()) - cred.value = "argon2:$argon2id$v=19$m=65536,t=1," + + cred = "argon2:$argon2id$v=19$m=65536,t=1," + "p=4$WXJGqwIB2qd+pRmxMOw9Dg$X4gvR0ZB2DtQoN8vOnJPR2SeFdUhH9TyVzfV98sfWeE" assert.True(t, cred.IsArgon2()) assert.True(t, cred.IsHashed()) }) t.Run("Check-plain", func(t *testing.T) { - cred := Credential{value: "password"} + cred := Credential("password") assert.True(t, cred.Check("password")) assert.False(t, cred.Check("wrongpassword")) }) t.Run("Check-sha256", func(t *testing.T) { - cred := Credential{value: "password"} + cred := Credential("password") assert.True(t, cred.Check("password")) assert.False(t, cred.Check("wrongpassword")) }) t.Run("Check-sha256", func(t *testing.T) { - cred := Credential{value: "sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ="} + cred := Credential("sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ=") assert.True(t, cred.Check("testuser")) assert.False(t, cred.Check("notestuser")) }) t.Run("Check-argon2", func(t *testing.T) { - cred := Credential{value: "argon2:$argon2id$v=19$m=4096,t=3," + - "p=1$MTIzNDU2Nzg$Ux/LWeTgJQPyfMMJo1myR64+o8rALHoPmlE1i/TR+58"} + cred := Credential("argon2:$argon2id$v=19$m=4096,t=3," + + "p=1$MTIzNDU2Nzg$Ux/LWeTgJQPyfMMJo1myR64+o8rALHoPmlE1i/TR+58") assert.True(t, cred.Check("testuser")) assert.False(t, cred.Check("notestuser")) }) @@ -105,50 +89,50 @@ func TestCredential(t *testing.T) { t.Run("validate", func(t *testing.T) { tests := []struct { name string - cred *Credential + cred Credential wantErr bool }{ { name: "Empty credential", - cred: &Credential{value: ""}, + cred: Credential(""), wantErr: false, }, { name: "Valid plain credential", - cred: &Credential{value: "validPlain123"}, + cred: Credential("validPlain123"), wantErr: false, }, { name: "Invalid plain credential", - cred: &Credential{value: "invalid/Plain"}, + cred: Credential("invalid/Plain"), wantErr: true, }, { name: "Valid sha256 credential", - cred: &Credential{value: "sha256:validBase64EncodedHash=="}, + cred: Credential("sha256:validBase64EncodedHash=="), wantErr: false, }, { name: "Invalid sha256 credential", - cred: &Credential{value: "sha256:inval*idBase64"}, + cred: Credential("sha256:inval*idBase64"), wantErr: true, }, { name: "Valid Argon2 credential", - cred: &Credential{value: "argon2:$argon2id$v=19$m=4096," + - "t=3,p=1$MTIzNDU2Nzg$zarsL19s86GzUWlAkvwt4gJBFuU/A9CVuCjNI4fksow"}, + cred: Credential("argon2:$argon2id$v=19$m=4096," + + "t=3,p=1$MTIzNDU2Nzg$zarsL19s86GzUWlAkvwt4gJBFuU/A9CVuCjNI4fksow"), wantErr: false, }, { name: "Invalid Argon2 credential", - cred: &Credential{value: "argon2:invalid"}, + cred: Credential("argon2:invalid"), wantErr: true, }, { name: "Invalid Argon2 credential", // testing argon2d errors, because it's not supported - cred: &Credential{value: "$argon2d$v=19$m=4096,t=3," + - "p=1$MTIzNDU2Nzg$Xqyd4R7LzXvvAEHaVU12+Nzf5OkHoYcwIEIIYJUDpz0"}, + cred: Credential("$argon2d$v=19$m=4096,t=3," + + "p=1$MTIzNDU2Nzg$Xqyd4R7LzXvvAEHaVU12+Nzf5OkHoYcwIEIIYJUDpz0"), wantErr: true, }, } diff --git a/internal/conf/path.go b/internal/conf/path.go index 9e641f3d..2420db12 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -377,11 +377,11 @@ func (pconf *Path) validate(conf *Conf, name string) error { // Authentication - if (!pconf.PublishUser.IsEmpty() && pconf.PublishPass.IsEmpty()) || - (pconf.PublishUser.IsEmpty() && !pconf.PublishPass.IsEmpty()) { + if (pconf.PublishUser != "" && pconf.PublishPass == "") || + (pconf.PublishUser == "" && pconf.PublishPass != "") { return fmt.Errorf("read username and password must be both filled") } - if !pconf.PublishUser.IsEmpty() && pconf.Source != "publisher" { + if pconf.PublishUser != "" && pconf.Source != "publisher" { return fmt.Errorf("'publishUser' is useless when source is not 'publisher', since " + "the stream is not provided by a publisher, but by a fixed source") } @@ -389,8 +389,8 @@ func (pconf *Path) validate(conf *Conf, name string) error { return fmt.Errorf("'publishIPs' is useless when source is not 'publisher', since " + "the stream is not provided by a publisher, but by a fixed source") } - if (!pconf.ReadUser.IsEmpty() && pconf.ReadPass.IsEmpty()) || - (pconf.ReadUser.IsEmpty() && !pconf.ReadPass.IsEmpty()) { + if (pconf.ReadUser != "" && pconf.ReadPass == "") || + (pconf.ReadUser == "" && pconf.ReadPass != "") { return fmt.Errorf("read username and password must be both filled") } if contains(conf.AuthMethods, headers.AuthDigest) { @@ -402,9 +402,9 @@ func (pconf *Path) validate(conf *Conf, name string) error { } } if conf.ExternalAuthenticationURL != "" { - if !pconf.PublishUser.IsEmpty() || + if pconf.PublishUser != "" || len(pconf.PublishIPs) > 0 || - !pconf.ReadUser.IsEmpty() || + pconf.ReadUser != "" || len(pconf.ReadIPs) > 0 { return fmt.Errorf("credentials or IPs can't be used together with 'externalAuthenticationURL'") } diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 5e262bc1..5b13fb23 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -17,7 +17,6 @@ import ( "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" srt "github.com/datarhei/gosrt" "github.com/google/uuid" @@ -29,16 +28,6 @@ import ( "github.com/bluenviron/mediamtx/internal/test" ) -var testMediaH264 = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, -} - -var testMediaAAC = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{test.FormatMPEG4Audio}, -} - func checkClose(t *testing.T, closeFunc func() error) { require.NoError(t, closeFunc()) } @@ -110,14 +99,14 @@ func TestAPIPathsList(t *testing.T) { hc := &http.Client{Transport: &http.Transport{}} - media0 := testMediaH264 + media0 := test.UniqueMediaH264() source := gortsplib.Client{} err := source.StartRecording( "rtsp://localhost:8554/mypath", &description.Session{Medias: []*description.Media{ media0, - testMediaAAC, + test.MediaMPEG4Audio, }}) require.NoError(t, err) defer source.Close() @@ -171,8 +160,8 @@ func TestAPIPathsList(t *testing.T) { source := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} err = source.StartRecording("rtsps://localhost:8322/mypath", &description.Session{Medias: []*description.Media{ - testMediaH264, - testMediaAAC, + test.UniqueMediaH264(), + test.UniqueMediaMPEG4Audio(), }}) require.NoError(t, err) defer source.Close() @@ -313,7 +302,7 @@ func TestAPIPathsGet(t *testing.T) { if ca == "ok" || ca == "ok-nested" { source := gortsplib.Client{} err := source.StartRecording("rtsp://localhost:8554/"+pathName, - &description.Session{Medias: []*description.Media{testMediaH264}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer source.Close() @@ -384,7 +373,7 @@ func TestAPIProtocolListGet(t *testing.T) { hc := &http.Client{Transport: &http.Transport{}} - medi := testMediaH264 + medi := test.UniqueMediaH264() switch ca { //nolint:dupl case "rtsp conns", "rtsp sessions": @@ -947,7 +936,7 @@ func TestAPIProtocolKick(t *testing.T) { hc := &http.Client{Transport: &http.Transport{}} - medi := testMediaH264 + medi := test.MediaH264 switch ca { case "rtsp": diff --git a/internal/core/auth.go b/internal/core/auth.go index dbe2abf0..32126a95 100644 --- a/internal/core/auth.go +++ b/internal/core/auth.go @@ -104,12 +104,12 @@ func doAuthentication( } } - if !pathUser.IsEmpty() { + if pathUser != "" { if accessRequest.RTSPRequest != nil && rtspAuth.Method == headers.AuthDigest { err := auth.Validate( accessRequest.RTSPRequest, - pathUser.GetValue(), - pathPass.GetValue(), + string(pathUser), + string(pathPass), accessRequest.RTSPBaseURL, rtspAuthMethods, "IPCAM", diff --git a/internal/core/auth_test.go b/internal/core/auth_test.go new file mode 100644 index 00000000..9a373164 --- /dev/null +++ b/internal/core/auth_test.go @@ -0,0 +1,155 @@ +package core + +import ( + "context" + "encoding/json" + "net" + "net/http" + "testing" + + "github.com/bluenviron/gortsplib/v4/pkg/headers" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/stretchr/testify/require" +) + +type testHTTPAuthenticator struct { + *http.Server +} + +func (ts *testHTTPAuthenticator) initialize(t *testing.T, protocol string, action string) { + firstReceived := false + + ts.Server = &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "/auth", r.URL.Path) + + var in struct { + IP string `json:"ip"` + User string `json:"user"` + Password string `json:"password"` + Path string `json:"path"` + Protocol string `json:"protocol"` + ID string `json:"id"` + Action string `json:"action"` + Query string `json:"query"` + } + err := json.NewDecoder(r.Body).Decode(&in) + require.NoError(t, err) + + var user string + if action == "publish" { + user = "testpublisher" + } else { + user = "testreader" + } + + if in.IP != "127.0.0.1" || + in.User != user || + in.Password != "testpass" || + in.Path != "teststream" || + in.Protocol != protocol || + (firstReceived && in.ID == "") || + in.Action != action || + (in.Query != "user=testreader&pass=testpass¶m=value" && + in.Query != "user=testpublisher&pass=testpass¶m=value" && + in.Query != "param=value") { + w.WriteHeader(http.StatusBadRequest) + return + } + + firstReceived = true + }), + } + + ln, err := net.Listen("tcp", "127.0.0.1:9120") + require.NoError(t, err) + + go ts.Server.Serve(ln) +} + +func (ts *testHTTPAuthenticator) close() { + ts.Server.Shutdown(context.Background()) +} + +func TestAuthSha256(t *testing.T) { + err := doAuthentication( + "", + conf.AuthMethods{headers.AuthBasic}, + &conf.Path{ + PublishUser: conf.Credential("sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ="), + PublishPass: conf.Credential("sha256:E9JJ8stBJ7QM+nV4ZoUCeHk/gU3tPFh/5YieiJp6n2w="), + }, + defs.PathAccessRequest{ + Name: "mypath", + Query: "", + Publish: true, + SkipAuth: false, + IP: net.ParseIP("127.0.0.1"), + User: "testuser", + Pass: "testpass", + Proto: defs.AuthProtocolRTSP, + ID: nil, + RTSPRequest: nil, + RTSPBaseURL: nil, + RTSPNonce: "", + }, + ) + require.NoError(t, err) +} + +func TestAuthArgon2(t *testing.T) { + err := doAuthentication( + "", + conf.AuthMethods{headers.AuthBasic}, + &conf.Path{ + PublishUser: conf.Credential( + "argon2:$argon2id$v=19$m=4096,t=3,p=1$MTIzNDU2Nzg$Ux/LWeTgJQPyfMMJo1myR64+o8rALHoPmlE1i/TR+58"), + PublishPass: conf.Credential( + "argon2:$argon2i$v=19$m=4096,t=3,p=1$MTIzNDU2Nzg$/mrZ42TiTv1mcPnpMUera5oi0SFYbbyueAbdx5sUvWo"), + }, + defs.PathAccessRequest{ + Name: "mypath", + Query: "", + Publish: true, + SkipAuth: false, + IP: net.ParseIP("127.0.0.1"), + User: "testuser", + Pass: "testpass", + Proto: defs.AuthProtocolRTSP, + ID: nil, + RTSPRequest: nil, + RTSPBaseURL: nil, + RTSPNonce: "", + }, + ) + require.NoError(t, err) +} + +func TestAuthExternal(t *testing.T) { + au := &testHTTPAuthenticator{} + au.initialize(t, "rtsp", "publish") + defer au.close() + + err := doAuthentication( + "http://127.0.0.1:9120/auth", + conf.AuthMethods{headers.AuthBasic}, + &conf.Path{}, + defs.PathAccessRequest{ + Name: "teststream", + Query: "param=value", + Publish: true, + SkipAuth: false, + IP: net.ParseIP("127.0.0.1"), + User: "testpublisher", + Pass: "testpass", + Proto: defs.AuthProtocolRTSP, + ID: nil, + RTSPRequest: nil, + RTSPBaseURL: nil, + RTSPNonce: "", + }, + ) + require.NoError(t, err) +} diff --git a/internal/core/core_test.go b/internal/core/core_test.go index 33ee36d4..7123d287 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -110,11 +110,9 @@ func TestCoreHotReloading(t *testing.T) { defer p.Close() func() { - medi := testMediaH264 - c := gortsplib.Client{} err = c.StartRecording("rtsp://localhost:8554/test1", - &description.Session{Medias: []*description.Media{medi}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.EqualError(t, err, "bad status code: 401 (Unauthorized)") }() @@ -126,11 +124,9 @@ func TestCoreHotReloading(t *testing.T) { time.Sleep(1 * time.Second) func() { - medi := testMediaH264 - conn := gortsplib.Client{} err = conn.StartRecording("rtsp://localhost:8554/test1", - &description.Session{Medias: []*description.Media{medi}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer conn.Close() }() diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index 8d50a1ca..9bcb7461 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -15,7 +15,6 @@ import ( "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" srt "github.com/datarhei/gosrt" "github.com/pion/rtp" @@ -110,10 +109,7 @@ webrtc_sessions_bytes_sent 0 defer wg.Done() source := gortsplib.Client{} err := source.StartRecording("rtsp://localhost:8554/rtsp_path", - &description.Session{Medias: []*description.Media{{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - }}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer source.Close() <-terminate @@ -123,10 +119,7 @@ webrtc_sessions_bytes_sent 0 defer wg.Done() source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} err := source2.StartRecording("rtsps://localhost:8322/rtsps_path", - &description.Session{Medias: []*description.Media{{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - }}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer source2.Close() <-terminate @@ -178,7 +171,7 @@ webrtc_sessions_bytes_sent 0 Log: test.NilLogger{}, } - tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil) + tracks, err := s.Publish(context.Background(), test.MediaH264.Formats[0], nil) require.NoError(t, err) defer checkClose(t, s.Close) diff --git a/internal/core/path_manager_test.go b/internal/core/path_manager_test.go index 85d14c8d..6b506fce 100644 --- a/internal/core/path_manager_test.go +++ b/internal/core/path_manager_test.go @@ -7,12 +7,9 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/headers" - "github.com/bluenviron/mediamtx/internal/defs" "github.com/stretchr/testify/require" ) -var _ defs.PathManager = &pathManager{} - func TestPathAutoDeletion(t *testing.T) { for _, ca := range []string{"describe", "setup"} { t.Run(ca, func(t *testing.T) { diff --git a/internal/core/path_test.go b/internal/core/path_test.go index 30a90d73..4da688be 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -242,7 +242,7 @@ func TestPathRunOnConnect(t *testing.T) { err := c.StartRecording( "rtsp://localhost:8554/test", - &description.Session{Medias: []*description.Media{testMediaH264}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer c.Close() @@ -302,9 +302,10 @@ func TestPathRunOnReady(t *testing.T) { defer p.Close() c := gortsplib.Client{} + err := c.StartRecording( "rtsp://localhost:8554/test?query=value", - &description.Session{Medias: []*description.Media{testMediaH264}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer c.Close() @@ -339,10 +340,13 @@ func TestPathRunOnRead(t *testing.T) { require.Equal(t, true, ok) defer p.Close() + media0 := test.UniqueMediaH264() + source := gortsplib.Client{} + err := source.StartRecording( "rtsp://localhost:8554/test", - &description.Session{Medias: []*description.Media{testMediaH264}}) + &description.Session{Medias: []*description.Media{media0}}) require.NoError(t, err) defer source.Close() @@ -419,7 +423,7 @@ func TestPathRunOnRead(t *testing.T) { case <-writerTerminate: return } - err := source.WritePacketRTP(testMediaH264, &rtp.Packet{ + err := source.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: true, @@ -462,11 +466,12 @@ func TestPathMaxReaders(t *testing.T) { defer p.Close() source := gortsplib.Client{} + err := source.StartRecording( "rtsp://localhost:8554/mystream", &description.Session{Medias: []*description.Media{ - testMediaH264, - testMediaAAC, + test.UniqueMediaH264(), + test.UniqueMediaMPEG4Audio(), }}) require.NoError(t, err) defer source.Close() @@ -507,15 +512,18 @@ func TestPathRecord(t *testing.T) { require.Equal(t, true, ok) defer p.Close() + media0 := test.UniqueMediaH264() + source := gortsplib.Client{} + err = source.StartRecording( "rtsp://localhost:8554/mystream", - &description.Session{Medias: []*description.Media{testMediaH264}}) + &description.Session{Medias: []*description.Media{media0}}) require.NoError(t, err) defer source.Close() for i := 0; i < 4; i++ { - err := source.WritePacketRTP(testMediaH264, &rtp.Packet{ + err := source.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: true, @@ -550,7 +558,7 @@ func TestPathRecord(t *testing.T) { time.Sleep(500 * time.Millisecond) for i := 4; i < 8; i++ { - err := source.WritePacketRTP(testMediaH264, &rtp.Packet{ + err := source.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: true, @@ -607,7 +615,7 @@ func TestPathFallback(t *testing.T) { source := gortsplib.Client{} err := source.StartRecording("rtsp://localhost:8554/path2", - &description.Session{Medias: []*description.Media{testMediaH264}}) + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) require.NoError(t, err) defer source.Close() @@ -656,7 +664,7 @@ func TestPathSourceRegexp(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{testMediaH264}}) + stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{test.MediaH264}}) defer stream.Close() p, ok := newInstance( @@ -680,3 +688,106 @@ func TestPathSourceRegexp(t *testing.T) { _, _, err = reader.Describe(u) require.NoError(t, err) } + +func TestPathOverridePublisher(t *testing.T) { + for _, ca := range []string{ + "enabled", + "disabled", + } { + t.Run(ca, func(t *testing.T) { + conf := "rtmp: no\n" + + "paths:\n" + + " all_others:\n" + + if ca == "disabled" { + conf += " overridePublisher: no\n" + } + + p, ok := newInstance(conf) + require.Equal(t, true, ok) + defer p.Close() + + medi := test.UniqueMediaH264() + + s1 := gortsplib.Client{} + + err := s1.StartRecording("rtsp://localhost:8554/teststream", + &description.Session{Medias: []*description.Media{medi}}) + require.NoError(t, err) + defer s1.Close() + + s2 := gortsplib.Client{} + + err = s2.StartRecording("rtsp://localhost:8554/teststream", + &description.Session{Medias: []*description.Media{medi}}) + if ca == "enabled" { + require.NoError(t, err) + defer s2.Close() + } else { + require.Error(t, err) + } + + frameRecv := make(chan struct{}) + + c := gortsplib.Client{} + + u, err := base.ParseURL("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + err = c.Start(u.Scheme, u.Host) + require.NoError(t, err) + defer c.Close() + + desc, _, err := c.Describe(u) + require.NoError(t, err) + + err = c.SetupAll(desc.BaseURL, desc.Medias) + require.NoError(t, err) + + c.OnPacketRTP(desc.Medias[0], desc.Medias[0].Formats[0], func(pkt *rtp.Packet) { + if ca == "enabled" { + require.Equal(t, []byte{5, 15, 16, 17, 18}, pkt.Payload) + } else { + require.Equal(t, []byte{5, 11, 12, 13, 14}, pkt.Payload) + } + close(frameRecv) + }) + + _, err = c.Play(nil) + require.NoError(t, err) + + if ca == "enabled" { + err := s1.Wait() + require.EqualError(t, err, "EOF") + + err = s2.WritePacketRTP(medi, &rtp.Packet{ + Header: rtp.Header{ + Version: 0x02, + PayloadType: 96, + SequenceNumber: 57899, + Timestamp: 345234345, + SSRC: 978651231, + Marker: true, + }, + Payload: []byte{5, 15, 16, 17, 18}, + }) + require.NoError(t, err) + } else { + err = s1.WritePacketRTP(medi, &rtp.Packet{ + Header: rtp.Header{ + Version: 0x02, + PayloadType: 96, + SequenceNumber: 57899, + Timestamp: 345234345, + SSRC: 978651231, + Marker: true, + }, + Payload: []byte{5, 11, 12, 13, 14}, + }) + require.NoError(t, err) + } + + <-frameRecv + }) + } +} diff --git a/internal/core/rtsp_server_test.go b/internal/core/rtsp_server_test.go deleted file mode 100644 index 276df921..00000000 --- a/internal/core/rtsp_server_test.go +++ /dev/null @@ -1,441 +0,0 @@ -package core - -import ( - "context" - "encoding/json" - "net" - "net/http" - "testing" - - "github.com/bluenviron/gortsplib/v4" - "github.com/bluenviron/gortsplib/v4/pkg/base" - "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/pion/rtp" - "github.com/stretchr/testify/require" -) - -type testHTTPAuthenticator struct { - *http.Server -} - -func newTestHTTPAuthenticator(t *testing.T, protocol string, action string) *testHTTPAuthenticator { - firstReceived := false - - ts := &testHTTPAuthenticator{} - - ts.Server = &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, http.MethodPost, r.Method) - require.Equal(t, "/auth", r.URL.Path) - - var in struct { - IP string `json:"ip"` - User string `json:"user"` - Password string `json:"password"` - Path string `json:"path"` - Protocol string `json:"protocol"` - ID string `json:"id"` - Action string `json:"action"` - Query string `json:"query"` - } - err := json.NewDecoder(r.Body).Decode(&in) - require.NoError(t, err) - - var user string - if action == "publish" { - user = "testpublisher" - } else { - user = "testreader" - } - - if in.IP != "127.0.0.1" || - in.User != user || - in.Password != "testpass" || - in.Path != "teststream" || - in.Protocol != protocol || - (firstReceived && in.ID == "") || - in.Action != action || - (in.Query != "user=testreader&pass=testpass¶m=value" && - in.Query != "user=testpublisher&pass=testpass¶m=value" && - in.Query != "param=value") { - w.WriteHeader(http.StatusBadRequest) - return - } - - firstReceived = true - }), - } - - ln, err := net.Listen("tcp", "127.0.0.1:9120") - require.NoError(t, err) - - go ts.Server.Serve(ln) - - return ts -} - -func (ts *testHTTPAuthenticator) close() { - ts.Server.Shutdown(context.Background()) -} - -func TestRTSPServer(t *testing.T) { - for _, auth := range []string{ - "none", - "internal", - "external", - } { - t.Run("auth_"+auth, func(t *testing.T) { - var conf string - - switch auth { - case "none": - conf = "paths:\n" + - " all_others:\n" - - case "internal": - conf = "rtmp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " publishUser: testpublisher\n" + - " publishPass: testpass\n" + - " publishIPs: [127.0.0.0/16]\n" + - " readUser: testreader\n" + - " readPass: testpass\n" + - " readIPs: [127.0.0.0/16]\n" - - case "external": - conf = "externalAuthenticationURL: http://localhost:9120/auth\n" + - "paths:\n" + - " all_others:\n" - } - - p, ok := newInstance(conf) - require.Equal(t, true, ok) - defer p.Close() - - var a *testHTTPAuthenticator - if auth == "external" { - a = newTestHTTPAuthenticator(t, "rtsp", "publish") - } - - medi := testMediaH264 - - source := gortsplib.Client{} - - err := source.StartRecording( - "rtsp://testpublisher:testpass@127.0.0.1:8554/teststream?param=value", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer source.Close() - - if auth == "external" { - a.close() - a = newTestHTTPAuthenticator(t, "rtsp", "read") - defer a.close() - } - - reader := gortsplib.Client{} - - u, err := base.ParseURL("rtsp://testreader:testpass@127.0.0.1:8554/teststream?param=value") - require.NoError(t, err) - - err = reader.Start(u.Scheme, u.Host) - require.NoError(t, err) - defer reader.Close() - - desc, _, err := reader.Describe(u) - require.NoError(t, err) - - err = reader.SetupAll(desc.BaseURL, desc.Medias) - require.NoError(t, err) - - _, err = reader.Play(nil) - require.NoError(t, err) - }) - } -} - -func TestRTSPServerAuthHashedSHA256(t *testing.T) { - p, ok := newInstance( - "rtmp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " publishUser: sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ=\n" + - " publishPass: sha256:E9JJ8stBJ7QM+nV4ZoUCeHk/gU3tPFh/5YieiJp6n2w=\n") - require.Equal(t, true, ok) - defer p.Close() - - medi := testMediaH264 - - source := gortsplib.Client{} - - err := source.StartRecording( - "rtsp://testuser:testpass@127.0.0.1:8554/test/stream", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer source.Close() -} - -func TestRTSPServerAuthHashedArgon2(t *testing.T) { - p, ok := newInstance( - "rtmp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " publishUser: argon2:$argon2id$v=19$m=4096,t=3,p=1$MTIzNDU2Nzg$Ux/LWeTgJQPyfMMJo1myR64+o8rALHoPmlE1i/TR+58\n" + - " publishPass: argon2:$argon2i$v=19$m=4096,t=3,p=1$MTIzNDU2Nzg$/mrZ42TiTv1mcPnpMUera5oi0SFYbbyueAbdx5sUvWo\n") - require.Equal(t, true, ok) - defer p.Close() - - medi := testMediaH264 - - source := gortsplib.Client{} - - err := source.StartRecording( - "rtsp://testuser:testpass@127.0.0.1:8554/test/stream", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer source.Close() -} - -func TestRTSPServerAuthFail(t *testing.T) { - for _, ca := range []struct { - name string - user string - pass string - }{ - { - "wronguser", - "test1user", - "testpass", - }, - { - "wrongpass", - "testuser", - "test1pass", - }, - { - "wrongboth", - "test1user", - "test1pass", - }, - } { - t.Run("publish_"+ca.name, func(t *testing.T) { - p, ok := newInstance("rtmp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " publishUser: testuser\n" + - " publishPass: testpass\n") - require.Equal(t, true, ok) - defer p.Close() - - medi := testMediaH264 - - c := gortsplib.Client{} - - err := c.StartRecording( - "rtsp://"+ca.user+":"+ca.pass+"@localhost:8554/test/stream", - &description.Session{Medias: []*description.Media{medi}}, - ) - require.EqualError(t, err, "bad status code: 401 (Unauthorized)") - }) - } - - for _, ca := range []struct { - name string - user string - pass string - }{ - { - "wronguser", - "test1user", - "testpass", - }, - { - "wrongpass", - "testuser", - "test1pass", - }, - { - "wrongboth", - "test1user", - "test1pass", - }, - } { - t.Run("read_"+ca.name, func(t *testing.T) { - p, ok := newInstance("rtmp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " readUser: testuser\n" + - " readPass: testpass\n") - require.Equal(t, true, ok) - defer p.Close() - - c := gortsplib.Client{} - - u, err := base.ParseURL("rtsp://" + ca.user + ":" + ca.pass + "@localhost:8554/test/stream") - require.NoError(t, err) - - err = c.Start(u.Scheme, u.Host) - require.NoError(t, err) - defer c.Close() - - _, _, err = c.Describe(u) - require.EqualError(t, err, "bad status code: 401 (Unauthorized)") - }) - } - - t.Run("ip", func(t *testing.T) { - p, ok := newInstance("rtmp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " publishIPs: [128.0.0.1/32]\n") - require.Equal(t, true, ok) - defer p.Close() - - medi := testMediaH264 - - c := gortsplib.Client{} - - err := c.StartRecording( - "rtsp://localhost:8554/test/stream", - &description.Session{Medias: []*description.Media{medi}}, - ) - require.EqualError(t, err, "bad status code: 401 (Unauthorized)") - }) - - t.Run("external", func(t *testing.T) { - p, ok := newInstance("externalAuthenticationURL: http://localhost:9120/auth\n" + - "paths:\n" + - " all_others:\n") - require.Equal(t, true, ok) - defer p.Close() - - a := newTestHTTPAuthenticator(t, "rtsp", "publish") - defer a.close() - - medi := testMediaH264 - - c := gortsplib.Client{} - - err := c.StartRecording( - "rtsp://testpublisher2:testpass@localhost:8554/teststream?param=value", - &description.Session{Medias: []*description.Media{medi}}, - ) - require.EqualError(t, err, "bad status code: 401 (Unauthorized)") - }) -} - -func TestRTSPServerPublisherOverride(t *testing.T) { - for _, ca := range []string{ - "enabled", - "disabled", - } { - t.Run(ca, func(t *testing.T) { - conf := "rtmp: no\n" + - "paths:\n" + - " all_others:\n" - - if ca == "disabled" { - conf += " overridePublisher: no\n" - } - - p, ok := newInstance(conf) - require.Equal(t, true, ok) - defer p.Close() - - medi := testMediaH264 - - s1 := gortsplib.Client{} - - err := s1.StartRecording("rtsp://localhost:8554/teststream", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer s1.Close() - - s2 := gortsplib.Client{} - - err = s2.StartRecording("rtsp://localhost:8554/teststream", - &description.Session{Medias: []*description.Media{medi}}) - if ca == "enabled" { - require.NoError(t, err) - defer s2.Close() - } else { - require.Error(t, err) - } - - frameRecv := make(chan struct{}) - - c := gortsplib.Client{} - - u, err := base.ParseURL("rtsp://localhost:8554/teststream") - require.NoError(t, err) - - err = c.Start(u.Scheme, u.Host) - require.NoError(t, err) - defer c.Close() - - desc, _, err := c.Describe(u) - require.NoError(t, err) - - err = c.SetupAll(desc.BaseURL, desc.Medias) - require.NoError(t, err) - - c.OnPacketRTP(desc.Medias[0], desc.Medias[0].Formats[0], func(pkt *rtp.Packet) { - if ca == "enabled" { - require.Equal(t, []byte{5, 15, 16, 17, 18}, pkt.Payload) - } else { - require.Equal(t, []byte{5, 11, 12, 13, 14}, pkt.Payload) - } - close(frameRecv) - }) - - _, err = c.Play(nil) - require.NoError(t, err) - - if ca == "enabled" { - err := s1.Wait() - require.EqualError(t, err, "EOF") - - err = s2.WritePacketRTP(medi, &rtp.Packet{ - Header: rtp.Header{ - Version: 0x02, - PayloadType: 96, - SequenceNumber: 57899, - Timestamp: 345234345, - SSRC: 978651231, - Marker: true, - }, - Payload: []byte{5, 15, 16, 17, 18}, - }) - require.NoError(t, err) - } else { - err = s1.WritePacketRTP(medi, &rtp.Packet{ - Header: rtp.Header{ - Version: 0x02, - PayloadType: 96, - SequenceNumber: 57899, - Timestamp: 345234345, - SSRC: 978651231, - Marker: true, - }, - Payload: []byte{5, 11, 12, 13, 14}, - }) - require.NoError(t, err) - } - - <-frameRecv - }) - } -} diff --git a/internal/defs/path_manager.go b/internal/defs/path_manager.go deleted file mode 100644 index 2e997622..00000000 --- a/internal/defs/path_manager.go +++ /dev/null @@ -1,14 +0,0 @@ -package defs - -import ( - "github.com/bluenviron/mediamtx/internal/conf" - "github.com/bluenviron/mediamtx/internal/stream" -) - -// PathManager is a path manager. -type PathManager interface { - FindPathConf(req PathFindPathConfReq) (*conf.Path, error) - Describe(req PathDescribeReq) PathDescribeRes - AddPublisher(req PathAddPublisherReq) (Path, error) - AddReader(req PathAddReaderReq) (Path, *stream.Stream, error) -} diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index d9a5603a..28ba042f 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -9,7 +9,6 @@ import ( "github.com/bluenviron/gohlslib" "github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -119,12 +118,7 @@ func TestServerNotFound(t *testing.T) { func TestServerRead(t *testing.T) { t.Run("always remux off", func(t *testing.T) { - testMediaH264 := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - } - - desc := &description.Session{Medias: []*description.Media{testMediaH264}} + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( 1460, @@ -194,7 +188,7 @@ func TestServerRead(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) for i := 0; i < 4; i++ { - stream.WriteUnit(testMediaH264, test.FormatH264, &unit.H264{ + stream.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{ Base: unit.Base{ NTP: time.Time{}, PTS: time.Duration(i) * time.Second, @@ -210,12 +204,7 @@ func TestServerRead(t *testing.T) { }) t.Run("always remux on", func(t *testing.T) { - testMediaH264 := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - } - - desc := &description.Session{Medias: []*description.Media{testMediaH264}} + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( 1460, @@ -256,7 +245,7 @@ func TestServerRead(t *testing.T) { time.Sleep(100 * time.Millisecond) for i := 0; i < 4; i++ { - stream.WriteUnit(testMediaH264, test.FormatH264, &unit.H264{ + stream.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{ Base: unit.Base{ NTP: time.Time{}, PTS: time.Duration(i) * time.Second, diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index f516df2f..aa92e06e 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -188,13 +187,7 @@ func TestServerRead(t *testing.T) { require.NoError(t, err) defer os.Remove(serverKeyFpath) } - - testMediaH264 := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - } - - desc := &description.Session{Medias: []*description.Media{testMediaH264}} + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( 1460, diff --git a/internal/servers/rtsp/conn.go b/internal/servers/rtsp/conn.go index d2155cf2..edd725e6 100644 --- a/internal/servers/rtsp/conn.go +++ b/internal/servers/rtsp/conn.go @@ -32,7 +32,7 @@ type conn struct { runOnConnectRestart bool runOnDisconnect string externalCmdPool *externalcmd.Pool - pathManager defs.PathManager + pathManager serverPathManager rconn *gortsplib.ServerConn rserver *gortsplib.Server parent *Server diff --git a/internal/servers/rtsp/server.go b/internal/servers/rtsp/server.go index 1fd1e501..6e41ef00 100644 --- a/internal/servers/rtsp/server.go +++ b/internal/servers/rtsp/server.go @@ -21,6 +21,7 @@ import ( "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" ) // ErrConnNotFound is returned when a connection is not found. @@ -45,6 +46,12 @@ func printAddresses(srv *gortsplib.Server) string { return strings.Join(ret, ", ") } +type serverPathManager interface { + Describe(req defs.PathDescribeReq) defs.PathDescribeRes + AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) + AddReader(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) +} + type serverParent interface { logger.Writer } @@ -72,7 +79,7 @@ type Server struct { RunOnConnectRestart bool RunOnDisconnect string ExternalCmdPool *externalcmd.Pool - PathManager defs.PathManager + PathManager serverPathManager Parent serverParent ctx context.Context diff --git a/internal/servers/rtsp/server_test.go b/internal/servers/rtsp/server_test.go new file mode 100644 index 00000000..74e51b66 --- /dev/null +++ b/internal/servers/rtsp/server_test.go @@ -0,0 +1,264 @@ +package rtsp + +import ( + "testing" + "time" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/headers" + "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/externalcmd" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +type dummyPath struct { + stream *stream.Stream + streamCreated chan struct{} +} + +func (p *dummyPath) Name() string { + return "teststream" +} + +func (p *dummyPath) SafeConf() *conf.Path { + return &conf.Path{} +} + +func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { + return externalcmd.Environment{} +} + +func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { + var err error + p.stream, err = stream.New( + 1460, + req.Desc, + true, + test.NilLogger{}, + ) + if err != nil { + return nil, err + } + close(p.streamCreated) + return p.stream, nil +} + +func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) { +} + +func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) { +} + +func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) { +} + +type dummyPathManager struct { + path *dummyPath +} + +func (pm *dummyPathManager) Describe(_ defs.PathDescribeReq) defs.PathDescribeRes { + return defs.PathDescribeRes{ + Path: pm.path, + Stream: pm.path.stream, + Redirect: "", + Err: nil, + } +} + +func (pm *dummyPathManager) AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) { + return pm.path, nil +} + +func (pm *dummyPathManager) AddReader(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + return pm.path, pm.path.stream, nil +} + +func TestServerPublish(t *testing.T) { + path := &dummyPath{ + streamCreated: make(chan struct{}), + } + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:8557", + AuthMethods: []headers.AuthMethod{headers.AuthBasic}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + UseUDP: false, + UseMulticast: false, + RTPAddress: "", + RTCPAddress: "", + MulticastIPRange: "", + MulticastRTPPort: 0, + MulticastRTCPPort: 0, + IsTLS: false, + ServerCert: "", + ServerKey: "", + RTSPAddress: "", + Protocols: map[conf.Protocol]struct{}{conf.Protocol(gortsplib.TransportTCP): {}}, + RunOnConnect: "", + RunOnConnectRestart: false, + RunOnDisconnect: "", + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: &test.NilLogger{}, + } + err := s.Initialize() + require.NoError(t, err) + defer s.Close() + + source := gortsplib.Client{} + + media0 := test.UniqueMediaH264() + + err = source.StartRecording( + "rtsp://testpublisher:testpass@127.0.0.1:8557/teststream?param=value", + &description.Session{Medias: []*description.Media{media0}}) + require.NoError(t, err) + defer source.Close() + + <-path.streamCreated + + aw := asyncwriter.New(512, &test.NilLogger{}) + + recv := make(chan struct{}) + + path.stream.AddReader(aw, + path.stream.Desc().Medias[0], + path.stream.Desc().Medias[0].Formats[0], + func(u unit.Unit) error { + require.Equal(t, [][]byte{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 2, 3, 4}, + }, u.(*unit.H264).AU) + close(recv) + return nil + }) + + err = source.WritePacketRTP(media0, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{5, 2, 3, 4}, + }) + require.NoError(t, err) + + aw.Start() + <-recv + aw.Stop() +} + +func TestServerRead(t *testing.T) { + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} + + stream, err := stream.New( + 1460, + desc, + true, + test.NilLogger{}, + ) + require.NoError(t, err) + + path := &dummyPath{stream: stream} + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:8557", + AuthMethods: []headers.AuthMethod{headers.AuthBasic}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + UseUDP: false, + UseMulticast: false, + RTPAddress: "", + RTCPAddress: "", + MulticastIPRange: "", + MulticastRTPPort: 0, + MulticastRTCPPort: 0, + IsTLS: false, + ServerCert: "", + ServerKey: "", + RTSPAddress: "", + Protocols: map[conf.Protocol]struct{}{conf.Protocol(gortsplib.TransportTCP): {}}, + RunOnConnect: "", + RunOnConnectRestart: false, + RunOnDisconnect: "", + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: &test.NilLogger{}, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + reader := gortsplib.Client{} + + u, err := base.ParseURL("rtsp://testreader:testpass@127.0.0.1:8557/teststream?param=value") + require.NoError(t, err) + + err = reader.Start(u.Scheme, u.Host) + require.NoError(t, err) + defer reader.Close() + + desc2, _, err := reader.Describe(u) + require.NoError(t, err) + + err = reader.SetupAll(desc2.BaseURL, desc2.Medias) + require.NoError(t, err) + + recv := make(chan struct{}) + + reader.OnPacketRTPAny(func(m *description.Media, f format.Format, p *rtp.Packet) { + require.Equal(t, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: p.SequenceNumber, + Timestamp: 0, + SSRC: p.SSRC, + CSRC: []uint32{}, + }, + Payload: []byte{ + 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, + 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, + 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, + 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, + 0x07, 0x08, 0x00, 0x04, 0x05, 0x02, 0x03, 0x04, + }, + }, p) + close(recv) + }) + + _, err = reader.Play(nil) + require.NoError(t, err) + + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ + Base: unit.Base{ + NTP: time.Time{}, + }, + AU: [][]byte{ + {5, 2, 3, 4}, // IDR + }, + }) + + <-recv +} diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 2bd95715..2581b43b 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -29,7 +29,7 @@ type session struct { rconn *gortsplib.ServerConn rserver *gortsplib.Server externalCmdPool *externalcmd.Pool - pathManager defs.PathManager + pathManager serverPathManager parent *Server uuid uuid.UUID diff --git a/internal/servers/srt/server_test.go b/internal/servers/srt/server_test.go index 0ae7a1c3..7c637e1f 100644 --- a/internal/servers/srt/server_test.go +++ b/internal/servers/srt/server_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" @@ -167,12 +166,7 @@ func TestServerRead(t *testing.T) { externalCmdPool := externalcmd.NewPool() defer externalCmdPool.Close() - testMediaH264 := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - } - - desc := &description.Session{Medias: []*description.Media{testMediaH264}} + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( 1460, diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index e15f92dc..f6e9af06 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -89,7 +88,7 @@ func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *st func TestServerStaticPages(t *testing.T) { s := &Server{ - Address: "127.0.0.1:8889", + Address: "127.0.0.1:8886", Encryption: false, ServerKey: "", ServerCert: "", @@ -115,7 +114,7 @@ func TestServerStaticPages(t *testing.T) { for _, path := range []string{"/stream", "/stream/publish", "/publish"} { func() { - req, err := http.NewRequest(http.MethodGet, "http://localhost:8889"+path, nil) + req, err := http.NewRequest(http.MethodGet, "http://localhost:8886"+path, nil) require.NoError(t, err) res, err := hc.Do(req) @@ -135,7 +134,7 @@ func TestServerPublish(t *testing.T) { pathManager := &dummyPathManager{path: path} s := &Server{ - Address: "127.0.0.1:8889", + Address: "127.0.0.1:8886", Encryption: false, ServerKey: "", ServerCert: "", @@ -161,7 +160,7 @@ func TestServerPublish(t *testing.T) { // preflight requests must always work, without authentication func() { - req, err := http.NewRequest(http.MethodOptions, "http://localhost:8889/teststream/whip", nil) + req, err := http.NewRequest(http.MethodOptions, "http://localhost:8886/teststream/whip", nil) require.NoError(t, err) req.Header.Set("Access-Control-Request-Method", "OPTIONS") @@ -177,7 +176,7 @@ func TestServerPublish(t *testing.T) { }() ur := "http://" - ur += "localhost:8889/teststream/whip?param=value" + ur += "localhost:8886/teststream/whip?param=value" su, err := url.Parse(ur) require.NoError(t, err) @@ -216,7 +215,7 @@ func TestServerPublish(t *testing.T) { path.stream.Desc().Medias[0].Formats[0], func(u unit.Unit) error { require.Equal(t, [][]byte{ - {2}, + {1}, }, u.(*unit.H264).AU) close(recv) return nil @@ -231,7 +230,7 @@ func TestServerPublish(t *testing.T) { Timestamp: 45343, SSRC: 563423, }, - Payload: []byte{2}, + Payload: []byte{1}, }) require.NoError(t, err) @@ -241,17 +240,7 @@ func TestServerPublish(t *testing.T) { } func TestServerRead(t *testing.T) { - testMediaH264 := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, - } - - /*testMediaAAC := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatMPEG4Audio}, - }*/ - - desc := &description.Session{Medias: []*description.Media{testMediaH264}} + desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( 1460, @@ -266,7 +255,7 @@ func TestServerRead(t *testing.T) { pathManager := &dummyPathManager{path: path} s := &Server{ - Address: "127.0.0.1:8889", + Address: "127.0.0.1:8886", Encryption: false, ServerKey: "", ServerCert: "", @@ -289,7 +278,7 @@ func TestServerRead(t *testing.T) { defer s.Close() ur := "http://" - ur += "localhost:8889/teststream/whep?param=value" + ur += "localhost:8886/teststream/whep?param=value" u, err := url.Parse(ur) require.NoError(t, err) @@ -357,7 +346,7 @@ func TestServerReadNotFound(t *testing.T) { pathManager := &dummyPathManager{} s := &Server{ - Address: "127.0.0.1:8889", + Address: "127.0.0.1:8886", Encryption: false, ServerKey: "", ServerCert: "", @@ -381,7 +370,7 @@ func TestServerReadNotFound(t *testing.T) { hc := &http.Client{Transport: &http.Transport{}} - iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc, "http://localhost:8889/nonexisting/whep") + iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc, "http://localhost:8886/nonexisting/whep") require.NoError(t, err) pc, err := pwebrtc.NewPeerConnection(pwebrtc.Configuration{ @@ -397,7 +386,7 @@ func TestServerReadNotFound(t *testing.T) { require.NoError(t, err) req, err := http.NewRequest(http.MethodPost, - "http://localhost:8889/nonexisting/whep", bytes.NewReader([]byte(offer.SDP))) + "http://localhost:8886/nonexisting/whep", bytes.NewReader([]byte(offer.SDP))) require.NoError(t, err) req.Header.Set("Content-Type", "application/sdp") diff --git a/internal/staticsources/rtsp/source_test.go b/internal/staticsources/rtsp/source_test.go index 53733c1a..ebdb3e99 100644 --- a/internal/staticsources/rtsp/source_test.go +++ b/internal/staticsources/rtsp/source_test.go @@ -10,7 +10,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" "github.com/stretchr/testify/require" @@ -38,11 +37,6 @@ func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo return sh.onPlay(ctx) } -var testMediaH264 = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{test.FormatH264}, -} - func TestSource(t *testing.T) { for _, source := range []string{ "udp", @@ -55,6 +49,8 @@ func TestSource(t *testing.T) { nonce, err := auth.GenerateNonce() require.NoError(t, err) + media0 := test.UniqueMediaH264() + s := gortsplib.Server{ Handler: &testServer{ onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx, @@ -81,7 +77,7 @@ func TestSource(t *testing.T) { onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { time.Sleep(100 * time.Millisecond) - err := stream.WritePacketRTP(testMediaH264, &rtp.Packet{ + err := stream.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 0x02, PayloadType: 96, @@ -127,7 +123,7 @@ func TestSource(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{testMediaH264}}) + stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{media0}}) defer stream.Close() var te *test.SourceTester @@ -180,6 +176,8 @@ func TestRTSPSourceNoPassword(t *testing.T) { nonce, err := auth.GenerateNonce() require.NoError(t, err) + media0 := test.UniqueMediaH264() + s := gortsplib.Server{ Handler: &testServer{ onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { @@ -200,7 +198,7 @@ func TestRTSPSourceNoPassword(t *testing.T) { onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { go func() { time.Sleep(100 * time.Millisecond) - err := stream.WritePacketRTP(testMediaH264, &rtp.Packet{ + err := stream.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 0x02, PayloadType: 96, @@ -231,7 +229,7 @@ func TestRTSPSourceNoPassword(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{testMediaH264}}) + stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{media0}}) defer stream.Close() var sp conf.RTSPTransport @@ -261,6 +259,8 @@ func TestRTSPSourceRange(t *testing.T) { t.Run(ca, func(t *testing.T) { var stream *gortsplib.ServerStream + media0 := test.UniqueMediaH264() + s := gortsplib.Server{ Handler: &testServer{ onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { @@ -287,7 +287,7 @@ func TestRTSPSourceRange(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) - err := stream.WritePacketRTP(testMediaH264, &rtp.Packet{ + err := stream.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 0x02, PayloadType: 96, @@ -313,7 +313,7 @@ func TestRTSPSourceRange(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{testMediaH264}}) + stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{media0}}) defer stream.Close() cnf := &conf.Path{} diff --git a/internal/test/medias.go b/internal/test/medias.go new file mode 100644 index 00000000..84b37520 --- /dev/null +++ b/internal/test/medias.go @@ -0,0 +1,28 @@ +package test + +import ( + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" +) + +// MediaH264 is a test H264 media. +var MediaH264 = UniqueMediaH264() + +// MediaMPEG4Audio is a test MPEG-4 audio media. +var MediaMPEG4Audio = UniqueMediaMPEG4Audio() + +// UniqueMediaH264 is a test H264 media. +func UniqueMediaH264() *description.Media { + return &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{FormatH264}, + } +} + +// UniqueMediaMPEG4Audio is a test MPEG-4 audio media. +func UniqueMediaMPEG4Audio() *description.Media { + return &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{FormatMPEG4Audio}, + } +}