Browse Source

unify authentication mechanisms (#1775)

pull/1778/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
d8678cef90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      go.mod
  2. 4
      go.sum
  3. 15
      internal/conf/conf.go
  4. 27
      internal/conf/path.go
  5. 5
      internal/conf/yaml/load.go
  6. 183
      internal/core/authentication.go
  7. 14
      internal/core/core.go
  8. 71
      internal/core/external_auth.go
  9. 83
      internal/core/hls_muxer.go
  10. 70
      internal/core/path.go
  11. 134
      internal/core/path_manager.go
  12. 149
      internal/core/rtmp_conn.go
  13. 62
      internal/core/rtmp_server.go
  14. 235
      internal/core/rtsp_conn.go
  15. 56
      internal/core/rtsp_server.go
  16. 15
      internal/core/rtsp_server_test.go
  17. 91
      internal/core/rtsp_session.go
  18. 9
      internal/core/webrtc_conn.go
  19. 161
      internal/core/webrtc_server.go
  20. 3
      internal/websocket/serverconn.go
  21. 5
      mediamtx.yml

2
go.mod

@ -8,7 +8,7 @@ require ( @@ -8,7 +8,7 @@ require (
github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.2.3
github.com/bluenviron/gortsplib/v3 v3.5.0
github.com/bluenviron/gortsplib/v3 v3.6.1
github.com/bluenviron/mediacommon v0.5.0
github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.0

4
go.sum

@ -14,8 +14,8 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z @@ -14,8 +14,8 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.2.3 h1:vZmpjh2qWHaCvwwha04tgu8Kz9p4CuSBRLayD2yf89A=
github.com/bluenviron/gohlslib v0.2.3/go.mod h1:loD97sTtBh/nBcw8yZJgXc71A6XQb0FsDWXFRkl7Yj4=
github.com/bluenviron/gortsplib/v3 v3.5.0 h1:8d6DYcwVhghObgBFOnoJwK6xf1ZiAQ8Vi7DRv6DGLdw=
github.com/bluenviron/gortsplib/v3 v3.5.0/go.mod h1:gc6Z8pBUMC9QBqYxcOY9eVxjDPOrmFcwVH61Xs3Gu2A=
github.com/bluenviron/gortsplib/v3 v3.6.1 h1:+/kPiwmdRwUasU5thOBATJQ4/yD+vrIEutJyRTB/f+0=
github.com/bluenviron/gortsplib/v3 v3.6.1/go.mod h1:gc6Z8pBUMC9QBqYxcOY9eVxjDPOrmFcwVH61Xs3Gu2A=
github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY=
github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=

15
internal/conf/conf.go

@ -195,6 +195,15 @@ func (conf Conf) Clone() *Conf { @@ -195,6 +195,15 @@ func (conf Conf) Clone() *Conf {
return &dest
}
func contains(list []headers.AuthMethod, item headers.AuthMethod) bool {
for _, i := range list {
if i == item {
return true
}
}
return false
}
// Check checks the configuration for errors.
func (conf *Conf) Check() error {
// general
@ -209,6 +218,10 @@ func (conf *Conf) Check() error { @@ -209,6 +218,10 @@ func (conf *Conf) Check() error {
!strings.HasPrefix(conf.ExternalAuthenticationURL, "https://") {
return fmt.Errorf("'externalAuthenticationURL' must be a HTTP URL")
}
if contains(conf.AuthMethods, headers.AuthDigest) {
return fmt.Errorf("'externalAuthenticationURL' can't be used when 'digest' is in authMethods")
}
}
// RTSP
@ -280,7 +293,7 @@ func (conf *Conf) UnmarshalJSON(b []byte) error { @@ -280,7 +293,7 @@ func (conf *Conf) UnmarshalJSON(b []byte) error {
conf.MulticastRTCPPort = 8003
conf.ServerKey = "server.key"
conf.ServerCert = "server.crt"
conf.AuthMethods = AuthMethods{headers.AuthBasic, headers.AuthDigest}
conf.AuthMethods = AuthMethods{headers.AuthBasic}
// RTMP
conf.RTMPAddress = ":1935"

27
internal/conf/path.go

@ -11,6 +11,7 @@ import ( @@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/headers"
"github.com/bluenviron/gortsplib/v3/pkg/url"
)
@ -252,30 +253,32 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -252,30 +253,32 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
"the stream is not provided by a publisher, but by a fixed source")
}
if pconf.PublishUser != "" && conf.ExternalAuthenticationURL != "" {
return fmt.Errorf("'publishUser' can't be used with 'externalAuthenticationURL'")
}
if len(pconf.PublishIPs) > 0 && pconf.Source != "publisher" {
return fmt.Errorf("'publishIPs' is useless when source is not 'publisher', since " +
"the stream is not provided by a publisher, but by a fixed source")
}
if len(pconf.PublishIPs) > 0 && conf.ExternalAuthenticationURL != "" {
return fmt.Errorf("'publishIPs' can't be used with 'externalAuthenticationURL'")
}
if (pconf.ReadUser != "" && pconf.ReadPass == "") ||
(pconf.ReadUser == "" && pconf.ReadPass != "") {
return fmt.Errorf("read username and password must be both filled")
}
if pconf.ReadUser != "" && conf.ExternalAuthenticationURL != "" {
return fmt.Errorf("'readUser' can't be used with 'externalAuthenticationURL'")
if contains(conf.AuthMethods, headers.AuthDigest) {
if strings.HasPrefix(string(pconf.PublishUser), "sha256:") ||
strings.HasPrefix(string(pconf.PublishPass), "sha256:") ||
strings.HasPrefix(string(pconf.ReadUser), "sha256:") ||
strings.HasPrefix(string(pconf.ReadPass), "sha256:") {
return fmt.Errorf("hashed credentials can't be used when the digest auth method is available")
}
}
if len(pconf.ReadIPs) > 0 && conf.ExternalAuthenticationURL != "" {
return fmt.Errorf("'readIPs' can't be used with 'externalAuthenticationURL'")
if conf.ExternalAuthenticationURL != "" {
if pconf.PublishUser != "" ||
len(pconf.PublishIPs) > 0 ||
pconf.ReadUser != "" ||
len(pconf.ReadIPs) > 0 {
return fmt.Errorf("credentials or IPs can't be used together with 'externalAuthenticationURL'")
}
}
if pconf.RunOnInit != "" && pconf.Regexp != nil {

5
internal/conf/yaml/load.go

@ -2,7 +2,6 @@ @@ -2,7 +2,6 @@
package yaml
import (
"bytes"
"encoding/json"
"fmt"
@ -64,7 +63,5 @@ func Load(buf []byte, dest interface{}) error { @@ -64,7 +63,5 @@ func Load(buf []byte, dest interface{}) error {
}
// load JSON into destination
d := json.NewDecoder(bytes.NewReader(buf))
d.DisallowUnknownFields()
return d.Decode(dest)
return json.Unmarshal(buf, dest)
}

183
internal/core/authentication.go

@ -0,0 +1,183 @@ @@ -0,0 +1,183 @@
package core
import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"github.com/bluenviron/gortsplib/v3/pkg/auth"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/headers"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/google/uuid"
"github.com/aler9/mediamtx/internal/conf"
)
func sha256Base64(in string) string {
h := sha256.New()
h.Write([]byte(in))
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
func checkCredential(right string, guess string) bool {
if strings.HasPrefix(right, "sha256:") {
return right[len("sha256:"):] == sha256Base64(guess)
}
return right == guess
}
type authProtocol string
const (
authProtocolRTSP authProtocol = "rtsp"
authProtocolRTMP authProtocol = "rtmp"
authProtocolHLS authProtocol = "hls"
authProtocolWebRTC authProtocol = "webrtc"
)
func externalAuth(
ur string,
ip string,
user string,
password string,
path string,
protocol authProtocol,
id *uuid.UUID,
publish bool,
query string,
) error {
enc, _ := json.Marshal(struct {
IP string `json:"ip"`
User string `json:"user"`
Password string `json:"password"`
Path string `json:"path"`
Protocol string `json:"protocol"`
ID *uuid.UUID `json:"id"`
Action string `json:"action"`
Query string `json:"query"`
}{
IP: ip,
User: user,
Password: password,
Path: path,
Protocol: string(protocol),
Action: func() string {
if publish {
return "publish"
}
return "read"
}(),
Query: query,
})
res, err := http.Post(ur, "application/json", bytes.NewReader(enc))
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode > 299 {
if resBody, err := io.ReadAll(res.Body); err == nil && len(resBody) != 0 {
return fmt.Errorf("external authentication replied with code %d: %s", res.StatusCode, string(resBody))
}
return fmt.Errorf("external authentication replied with code %d", res.StatusCode)
}
return nil
}
type authCredentials struct {
query string
ip net.IP
user string
pass string
proto authProtocol
id *uuid.UUID
rtspRequest *base.Request
rtspBaseURL *url.URL
rtspNonce string
}
func authenticate(
externalAuthenticationURL string,
rtspAuthMethods conf.AuthMethods,
pathName string,
pathConf *conf.PathConf,
publish bool,
credentials authCredentials,
) error {
var rtspAuth headers.Authorization
if credentials.rtspRequest != nil {
err := rtspAuth.Unmarshal(credentials.rtspRequest.Header["Authorization"])
if err == nil && rtspAuth.Method == headers.AuthBasic {
credentials.user = rtspAuth.BasicUser
credentials.pass = rtspAuth.BasicPass
}
}
if externalAuthenticationURL != "" {
err := externalAuth(
externalAuthenticationURL,
credentials.ip.String(),
credentials.user,
credentials.pass,
pathName,
credentials.proto,
credentials.id,
publish,
credentials.query,
)
if err != nil {
return fmt.Errorf("external authentication failed: %s", err)
}
}
var pathIPs conf.IPsOrCIDRs
var pathUser string
var pathPass string
if publish {
pathIPs = pathConf.PublishIPs
pathUser = string(pathConf.PublishUser)
pathPass = string(pathConf.PublishPass)
} else {
pathIPs = pathConf.ReadIPs
pathUser = string(pathConf.ReadUser)
pathPass = string(pathConf.ReadPass)
}
if pathIPs != nil {
if !ipEqualOrInRange(credentials.ip, pathIPs) {
return fmt.Errorf("IP '%s' not allowed", credentials.ip)
}
}
if pathUser != "" {
if credentials.rtspRequest != nil && rtspAuth.Method == headers.AuthDigest {
err := auth.Validate(
credentials.rtspRequest,
pathUser,
pathPass,
credentials.rtspBaseURL,
rtspAuthMethods,
"IPCAM",
credentials.rtspNonce)
if err != nil {
return err
}
} else if !checkCredential(pathUser, credentials.user) ||
!checkCredential(pathPass, credentials.pass) {
return fmt.Errorf("invalid credentials")
}
}
return nil
}

14
internal/core/core.go

@ -242,7 +242,9 @@ func (p *Core) createResources(initial bool) error { @@ -242,7 +242,9 @@ func (p *Core) createResources(initial bool) error {
if p.pathManager == nil {
p.pathManager = newPathManager(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.RTSPAddress,
p.conf.AuthMethods,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
p.conf.ReadBufferCount,
@ -262,7 +264,6 @@ func (p *Core) createResources(initial bool) error { @@ -262,7 +264,6 @@ func (p *Core) createResources(initial bool) error {
_, useMulticast := p.conf.Protocols[conf.Protocol(gortsplib.TransportUDPMulticast)]
p.rtspServer, err = newRTSPServer(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.RTSPAddress,
p.conf.AuthMethods,
p.conf.ReadTimeout,
@ -299,7 +300,6 @@ func (p *Core) createResources(initial bool) error { @@ -299,7 +300,6 @@ func (p *Core) createResources(initial bool) error {
if p.rtspsServer == nil {
p.rtspsServer, err = newRTSPServer(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.RTSPSAddress,
p.conf.AuthMethods,
p.conf.ReadTimeout,
@ -336,7 +336,6 @@ func (p *Core) createResources(initial bool) error { @@ -336,7 +336,6 @@ func (p *Core) createResources(initial bool) error {
if p.rtmpServer == nil {
p.rtmpServer, err = newRTMPServer(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.RTMPAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
@ -364,7 +363,6 @@ func (p *Core) createResources(initial bool) error { @@ -364,7 +363,6 @@ func (p *Core) createResources(initial bool) error {
if p.rtmpsServer == nil {
p.rtmpsServer, err = newRTMPServer(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.RTMPSAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
@ -420,7 +418,6 @@ func (p *Core) createResources(initial bool) error { @@ -420,7 +418,6 @@ func (p *Core) createResources(initial bool) error {
if p.webRTCServer == nil {
p.webRTCServer, err = newWebRTCServer(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.WebRTCAddress,
p.conf.WebRTCEncryption,
p.conf.WebRTCServerKey,
@ -490,7 +487,9 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -490,7 +487,9 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.ReadTimeout != p.conf.ReadTimeout
closePathManager := newConf == nil ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
!reflect.DeepEqual(newConf.AuthMethods, p.conf.AuthMethods) ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
@ -503,7 +502,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -503,7 +502,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeRTSPServer := newConf == nil ||
newConf.RTSPDisable != p.conf.RTSPDisable ||
newConf.Encryption != p.conf.Encryption ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
!reflect.DeepEqual(newConf.AuthMethods, p.conf.AuthMethods) ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
@ -525,7 +523,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -525,7 +523,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeRTSPSServer := newConf == nil ||
newConf.RTSPDisable != p.conf.RTSPDisable ||
newConf.Encryption != p.conf.Encryption ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.RTSPSAddress != p.conf.RTSPSAddress ||
!reflect.DeepEqual(newConf.AuthMethods, p.conf.AuthMethods) ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
@ -544,7 +541,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -544,7 +541,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.RTMPDisable != p.conf.RTMPDisable ||
newConf.RTMPEncryption != p.conf.RTMPEncryption ||
newConf.RTMPAddress != p.conf.RTMPAddress ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
@ -558,7 +554,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -558,7 +554,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.RTMPDisable != p.conf.RTMPDisable ||
newConf.RTMPEncryption != p.conf.RTMPEncryption ||
newConf.RTMPSAddress != p.conf.RTMPSAddress ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
@ -593,7 +588,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -593,7 +588,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeWebRTCServer := newConf == nil ||
newConf.WebRTCDisable != p.conf.WebRTCDisable ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.WebRTCAddress != p.conf.WebRTCAddress ||
newConf.WebRTCEncryption != p.conf.WebRTCEncryption ||
newConf.WebRTCServerKey != p.conf.WebRTCServerKey ||

71
internal/core/external_auth.go

@ -1,71 +0,0 @@ @@ -1,71 +0,0 @@
package core
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/google/uuid"
)
type externalAuthProto string
const (
externalAuthProtoRTSP externalAuthProto = "rtsp"
externalAuthProtoRTMP externalAuthProto = "rtmp"
externalAuthProtoHLS externalAuthProto = "hls"
externalAuthProtoWebRTC externalAuthProto = "webrtc"
)
func externalAuth(
ur string,
ip string,
user string,
password string,
path string,
protocol externalAuthProto,
id *uuid.UUID,
publish bool,
query string,
) error {
enc, _ := json.Marshal(struct {
IP string `json:"ip"`
User string `json:"user"`
Password string `json:"password"`
Path string `json:"path"`
Protocol string `json:"protocol"`
ID *uuid.UUID `json:"id"`
Action string `json:"action"`
Query string `json:"query"`
}{
IP: ip,
User: user,
Password: password,
Path: path,
Protocol: string(protocol),
Action: func() string {
if publish {
return "publish"
}
return "read"
}(),
Query: query,
})
res, err := http.Post(ur, "application/json", bytes.NewReader(enc))
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode > 299 {
if resBody, err := io.ReadAll(res.Body); err == nil {
return fmt.Errorf("external authentication replied with code %d: %s", res.StatusCode, resBody)
}
return fmt.Errorf("external authentication replied with code %d", res.StatusCode)
}
return nil
}

83
internal/core/hls_muxer.go

@ -267,6 +267,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -267,6 +267,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
res := m.pathManager.readerAdd(pathReaderAddReq{
author: m,
pathName: m.pathName,
skipAuth: true,
})
if res.err != nil {
return res.err
@ -554,13 +555,30 @@ func (m *hlsMuxer) handleRequest(ctx *gin.Context) { @@ -554,13 +555,30 @@ func (m *hlsMuxer) handleRequest(ctx *gin.Context) {
bytesSent: m.bytesSent,
}
err := m.authenticate(ctx)
user, pass, hasCredentials := ctx.Request.BasicAuth()
err := authenticate(
m.externalAuthenticationURL,
nil,
m.pathName,
m.path.safeConf(),
false,
authCredentials{
query: ctx.Request.URL.RawQuery,
ip: net.ParseIP(ctx.ClientIP()),
user: user,
pass: pass,
proto: authProtocolHLS,
},
)
if err != nil {
if terr, ok := err.(pathErrAuthCritical); ok {
m.Log(logger.Info, "authentication error: %s", terr.message)
if !hasCredentials {
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
w.WriteHeader(http.StatusUnauthorized)
return
}
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
m.Log(logger.Info, "authentication error: %s", err)
w.WriteHeader(http.StatusUnauthorized)
return
}
@ -575,63 +593,6 @@ func (m *hlsMuxer) handleRequest(ctx *gin.Context) { @@ -575,63 +593,6 @@ func (m *hlsMuxer) handleRequest(ctx *gin.Context) {
m.muxer.Handle(w, ctx.Request)
}
func (m *hlsMuxer) authenticate(ctx *gin.Context) error {
pathConf := m.path.safeConf()
pathIPs := pathConf.ReadIPs
pathUser := pathConf.ReadUser
pathPass := pathConf.ReadPass
if m.externalAuthenticationURL != "" {
ip := net.ParseIP(ctx.ClientIP())
user, pass, ok := ctx.Request.BasicAuth()
err := externalAuth(
m.externalAuthenticationURL,
ip.String(),
user,
pass,
m.pathName,
externalAuthProtoHLS,
nil,
false,
ctx.Request.URL.RawQuery)
if err != nil {
if !ok {
return pathErrAuthNotCritical{}
}
return pathErrAuthCritical{
message: fmt.Sprintf("external authentication failed: %s", err),
}
}
}
if pathIPs != nil {
ip := net.ParseIP(ctx.ClientIP())
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
message: fmt.Sprintf("IP '%s' not allowed", ip),
}
}
}
if pathUser != "" {
user, pass, ok := ctx.Request.BasicAuth()
if !ok {
return pathErrAuthNotCritical{}
}
if user != string(pathUser) || pass != string(pathPass) {
return pathErrAuthCritical{
message: "invalid credentials",
}
}
}
return nil
}
// processRequest is called by hlsserver.Server (forwarded from ServeHTTP).
func (m *hlsMuxer) processRequest(req *hlsMuxerRequest) {
select {

70
internal/core/path.go

@ -10,7 +10,6 @@ import ( @@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/url"
@ -25,39 +24,22 @@ func newEmptyTimer() *time.Timer { @@ -25,39 +24,22 @@ func newEmptyTimer() *time.Timer {
return t
}
type authenticateFunc func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error
type pathErrNoOnePublishing struct {
pathName string
}
// Error implements the error interface.
func (e pathErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.pathName)
}
type pathErrAuthNotCritical struct {
message string
response *base.Response
type pathErrAuth struct {
wrapped error
}
// Error implements the error interface.
func (pathErrAuthNotCritical) Error() string {
return "non-critical authentication error"
func (e pathErrAuth) Error() string {
return "authentication error"
}
type pathErrAuthCritical struct {
message string
response *base.Response
type pathErrNoOnePublishing struct {
pathName string
}
// Error implements the error interface.
func (pathErrAuthCritical) Error() string {
return "critical authentication error"
func (e pathErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.pathName)
}
type pathParent interface {
@ -101,6 +83,17 @@ type pathPublisherRemoveReq struct { @@ -101,6 +83,17 @@ type pathPublisherRemoveReq struct {
res chan struct{}
}
type pathGetPathConfRes struct {
conf *conf.PathConf
err error
}
type pathGetPathConfReq struct {
name string
credentials authCredentials
res chan pathGetPathConfRes
}
type pathDescribeRes struct {
path *path
stream *stream
@ -109,10 +102,10 @@ type pathDescribeRes struct { @@ -109,10 +102,10 @@ type pathDescribeRes struct {
}
type pathDescribeReq struct {
pathName string
url *url.URL
authenticate authenticateFunc
res chan pathDescribeRes
pathName string
url *url.URL
credentials authCredentials
res chan pathDescribeRes
}
type pathReaderSetupPlayRes struct {
@ -122,10 +115,11 @@ type pathReaderSetupPlayRes struct { @@ -122,10 +115,11 @@ type pathReaderSetupPlayRes struct {
}
type pathReaderAddReq struct {
author reader
pathName string
authenticate authenticateFunc
res chan pathReaderSetupPlayRes
author reader
pathName string
skipAuth bool
credentials authCredentials
res chan pathReaderSetupPlayRes
}
type pathPublisherAnnounceRes struct {
@ -134,10 +128,10 @@ type pathPublisherAnnounceRes struct { @@ -134,10 +128,10 @@ type pathPublisherAnnounceRes struct {
}
type pathPublisherAddReq struct {
author publisher
pathName string
authenticate authenticateFunc
res chan pathPublisherAnnounceRes
author publisher
pathName string
credentials authCredentials
res chan pathPublisherAnnounceRes
}
type pathPublisherRecordRes struct {

134
internal/core/path_manager.go

@ -39,15 +39,17 @@ type pathManagerParent interface { @@ -39,15 +39,17 @@ type pathManagerParent interface {
}
type pathManager struct {
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
udpMaxPayloadSize int
pathConfs map[string]*conf.PathConf
externalCmdPool *externalcmd.Pool
metrics *metrics
parent pathManagerParent
externalAuthenticationURL string
rtspAddress string
authMethods conf.AuthMethods
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
udpMaxPayloadSize int
pathConfs map[string]*conf.PathConf
externalCmdPool *externalcmd.Pool
metrics *metrics
parent pathManagerParent
ctx context.Context
ctxCancel func()
@ -61,6 +63,7 @@ type pathManager struct { @@ -61,6 +63,7 @@ type pathManager struct {
chPathClose chan *path
chPathSourceReady chan *path
chPathSourceNotReady chan *path
chPathGetPathConf chan pathGetPathConfReq
chDescribe chan pathDescribeReq
chReaderAdd chan pathReaderAddReq
chPublisherAdd chan pathPublisherAddReq
@ -70,7 +73,9 @@ type pathManager struct { @@ -70,7 +73,9 @@ type pathManager struct {
func newPathManager(
parentCtx context.Context,
externalAuthenticationURL string,
rtspAddress string,
authMethods conf.AuthMethods,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
@ -83,28 +88,31 @@ func newPathManager( @@ -83,28 +88,31 @@ func newPathManager(
ctx, ctxCancel := context.WithCancel(parentCtx)
pm := &pathManager{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
udpMaxPayloadSize: udpMaxPayloadSize,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
paths: make(map[string]*path),
pathsByConf: make(map[string]map[*path]struct{}),
chConfReload: make(chan map[string]*conf.PathConf),
chPathClose: make(chan *path),
chPathSourceReady: make(chan *path),
chPathSourceNotReady: make(chan *path),
chDescribe: make(chan pathDescribeReq),
chReaderAdd: make(chan pathReaderAddReq),
chPublisherAdd: make(chan pathPublisherAddReq),
chHLSServerSet: make(chan pathManagerHLSServer),
chAPIPathsList: make(chan pathAPIPathsListReq),
externalAuthenticationURL: externalAuthenticationURL,
rtspAddress: rtspAddress,
authMethods: authMethods,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
udpMaxPayloadSize: udpMaxPayloadSize,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
paths: make(map[string]*path),
pathsByConf: make(map[string]map[*path]struct{}),
chConfReload: make(chan map[string]*conf.PathConf),
chPathClose: make(chan *path),
chPathSourceReady: make(chan *path),
chPathSourceNotReady: make(chan *path),
chPathGetPathConf: make(chan pathGetPathConfReq),
chDescribe: make(chan pathDescribeReq),
chReaderAdd: make(chan pathReaderAddReq),
chPublisherAdd: make(chan pathPublisherAddReq),
chHLSServerSet: make(chan pathManagerHLSServer),
chAPIPathsList: make(chan pathAPIPathsListReq),
}
for pathConfName, pathConf := range pm.pathConfs {
@ -194,22 +202,32 @@ outer: @@ -194,22 +202,32 @@ outer:
pm.hlsServer.pathSourceNotReady(pa)
}
case req := <-pm.chPathGetPathConf:
_, pathConf, _, err := pm.getPathConfInternal(req.name)
if err != nil {
req.res <- pathGetPathConfRes{err: err}
continue
}
err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.name, pathConf, false, req.credentials)
if err != nil {
req.res <- pathGetPathConfRes{err: pathErrAuth{wrapped: err}}
continue
}
req.res <- pathGetPathConfRes{conf: pathConf}
case req := <-pm.chDescribe:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
pathConfName, pathConf, pathMatches, err := pm.getPathConfInternal(req.pathName)
if err != nil {
req.res <- pathDescribeRes{err: err}
continue
}
if req.authenticate != nil {
err = req.authenticate(
pathConf.ReadIPs,
pathConf.ReadUser,
pathConf.ReadPass)
if err != nil {
req.res <- pathDescribeRes{err: err}
continue
}
err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
if err != nil {
req.res <- pathDescribeRes{err: pathErrAuth{wrapped: err}}
continue
}
// create path if it doesn't exist
@ -220,19 +238,16 @@ outer: @@ -220,19 +238,16 @@ outer:
req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
case req := <-pm.chReaderAdd:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
pathConfName, pathConf, pathMatches, err := pm.getPathConfInternal(req.pathName)
if err != nil {
req.res <- pathReaderSetupPlayRes{err: err}
continue
}
if req.authenticate != nil {
err = req.authenticate(
pathConf.ReadIPs,
pathConf.ReadUser,
pathConf.ReadPass)
if !req.skipAuth {
err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
if err != nil {
req.res <- pathReaderSetupPlayRes{err: err}
req.res <- pathReaderSetupPlayRes{err: pathErrAuth{wrapped: err}}
continue
}
}
@ -245,18 +260,15 @@ outer: @@ -245,18 +260,15 @@ outer:
req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]}
case req := <-pm.chPublisherAdd:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
pathConfName, pathConf, pathMatches, err := pm.getPathConfInternal(req.pathName)
if err != nil {
req.res <- pathPublisherAnnounceRes{err: err}
continue
}
err = req.authenticate(
pathConf.PublishIPs,
pathConf.PublishUser,
pathConf.PublishPass)
err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials)
if err != nil {
req.res <- pathPublisherAnnounceRes{err: err}
req.res <- pathPublisherAnnounceRes{err: pathErrAuth{wrapped: err}}
continue
}
@ -330,7 +342,7 @@ func (pm *pathManager) removePath(pa *path) { @@ -330,7 +342,7 @@ func (pm *pathManager) removePath(pa *path) {
delete(pm.paths, pa.name)
}
func (pm *pathManager) findPathConf(name string) (string, *conf.PathConf, []string, error) {
func (pm *pathManager) getPathConfInternal(name string) (string, *conf.PathConf, []string, error) {
err := conf.IsValidPathName(name)
if err != nil {
return "", nil, nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
@ -389,6 +401,18 @@ func (pm *pathManager) onPathClose(pa *path) { @@ -389,6 +401,18 @@ func (pm *pathManager) onPathClose(pa *path) {
}
}
// getPathConf is called by a reader or publisher.
func (pm *pathManager) getPathConf(req pathGetPathConfReq) pathGetPathConfRes {
req.res = make(chan pathGetPathConfRes)
select {
case pm.chPathGetPathConf <- req:
return <-req.res
case <-pm.ctx.Done():
return pathGetPathConfRes{err: fmt.Errorf("terminated")}
}
}
// describe is called by a reader or publisher.
func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes {
req.res = make(chan pathDescribeRes)

149
internal/core/rtmp_conn.go

@ -196,20 +196,19 @@ type rtmpConnParent interface { @@ -196,20 +196,19 @@ type rtmpConnParent interface {
}
type rtmpConn struct {
isTLS bool
externalAuthenticationURL string
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
runOnConnect string
runOnConnectRestart bool
wg *sync.WaitGroup
conn *rtmp.Conn
nconn net.Conn
externalCmdPool *externalcmd.Pool
pathManager rtmpConnPathManager
parent rtmpConnParent
isTLS bool
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
runOnConnect string
runOnConnectRestart bool
wg *sync.WaitGroup
conn *rtmp.Conn
nconn net.Conn
externalCmdPool *externalcmd.Pool
pathManager rtmpConnPathManager
parent rtmpConnParent
ctx context.Context
ctxCancel func()
@ -222,7 +221,6 @@ type rtmpConn struct { @@ -222,7 +221,6 @@ type rtmpConn struct {
func newRTMPConn(
parentCtx context.Context,
isTLS bool,
externalAuthenticationURL string,
rtspAddress string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
@ -238,24 +236,23 @@ func newRTMPConn( @@ -238,24 +236,23 @@ func newRTMPConn(
ctx, ctxCancel := context.WithCancel(parentCtx)
c := &rtmpConn{
isTLS: isTLS,
externalAuthenticationURL: externalAuthenticationURL,
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
wg: wg,
conn: rtmp.NewConn(nconn),
nconn: nconn,
externalCmdPool: externalCmdPool,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
uuid: uuid.New(),
created: time.Now(),
isTLS: isTLS,
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
wg: wg,
conn: rtmp.NewConn(nconn),
nconn: nconn,
externalCmdPool: externalCmdPool,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
uuid: uuid.New(),
created: time.Now(),
}
c.Log(logger.Info, "opened")
@ -344,12 +341,12 @@ func (c *rtmpConn) runInner(ctx context.Context) error { @@ -344,12 +341,12 @@ func (c *rtmpConn) runInner(ctx context.Context) error {
c.nconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout)))
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
u, isPublishing, err := c.conn.InitializeServer()
u, publish, err := c.conn.InitializeServer()
if err != nil {
return err
}
if !isPublishing {
if !publish {
return c.runRead(ctx, u)
}
return c.runPublish(ctx, u)
@ -361,20 +358,21 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -361,20 +358,21 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
res := c.pathManager.readerAdd(pathReaderAddReq{
author: c,
pathName: pathName,
authenticate: func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error {
return c.authenticate(pathName, pathIPs, pathUser, pathPass, false, query, rawQuery)
credentials: authCredentials{
query: rawQuery,
ip: c.ip(),
user: query.Get("user"),
pass: query.Get("pass"),
proto: authProtocolRTMP,
id: &c.uuid,
},
})
if res.err != nil {
if terr, ok := res.err.(pathErrAuthCritical); ok {
if terr, ok := res.err.(pathErrAuth); ok {
// wait some seconds to stop brute force attacks
<-time.After(rtmpConnPauseAfterAuthError)
return errors.New(terr.message)
return terr.wrapped
}
return res.err
}
@ -716,20 +714,21 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -716,20 +714,21 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
res := c.pathManager.publisherAdd(pathPublisherAddReq{
author: c,
pathName: pathName,
authenticate: func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error {
return c.authenticate(pathName, pathIPs, pathUser, pathPass, true, query, rawQuery)
credentials: authCredentials{
query: rawQuery,
ip: c.ip(),
user: query.Get("user"),
pass: query.Get("pass"),
proto: authProtocolRTMP,
id: &c.uuid,
},
})
if res.err != nil {
if terr, ok := res.err.(pathErrAuthCritical); ok {
if terr, ok := res.err.(pathErrAuth); ok {
// wait some seconds to stop brute force attacks
<-time.After(rtmpConnPauseAfterAuthError)
return errors.New(terr.message)
return terr.wrapped
}
return res.err
}
@ -819,54 +818,6 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -819,54 +818,6 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
}
}
func (c *rtmpConn) authenticate(
pathName string,
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
isPublishing bool,
query url.Values,
rawQuery string,
) error {
if c.externalAuthenticationURL != "" {
err := externalAuth(
c.externalAuthenticationURL,
c.ip().String(),
query.Get("user"),
query.Get("pass"),
pathName,
externalAuthProtoRTMP,
&c.uuid,
isPublishing,
rawQuery)
if err != nil {
return pathErrAuthCritical{
message: fmt.Sprintf("external authentication failed: %s", err),
}
}
}
if pathIPs != nil {
ip := c.ip()
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
message: fmt.Sprintf("IP '%s' not allowed", ip),
}
}
}
if pathUser != "" {
if query.Get("user") != string(pathUser) ||
query.Get("pass") != string(pathPass) {
return pathErrAuthCritical{
message: "invalid credentials",
}
}
}
return nil
}
// apiReaderDescribe implements reader.
func (c *rtmpConn) apiReaderDescribe() interface{} {
return struct {

62
internal/core/rtmp_server.go

@ -48,18 +48,17 @@ type rtmpServerParent interface { @@ -48,18 +48,17 @@ type rtmpServerParent interface {
}
type rtmpServer struct {
externalAuthenticationURL string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
isTLS bool
rtspAddress string
runOnConnect string
runOnConnectRestart bool
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent rtmpServerParent
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
isTLS bool
rtspAddress string
runOnConnect string
runOnConnectRestart bool
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent rtmpServerParent
ctx context.Context
ctxCancel func()
@ -75,7 +74,6 @@ type rtmpServer struct { @@ -75,7 +74,6 @@ type rtmpServer struct {
func newRTMPServer(
parentCtx context.Context,
externalAuthenticationURL string,
address string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
@ -111,25 +109,24 @@ func newRTMPServer( @@ -111,25 +109,24 @@ func newRTMPServer(
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &rtmpServer{
externalAuthenticationURL: externalAuthenticationURL,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
rtspAddress: rtspAddress,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
isTLS: isTLS,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
conns: make(map[*rtmpConn]struct{}),
chConnClose: make(chan *rtmpConn),
chAPIConnsList: make(chan rtmpServerAPIConnsListReq),
chAPIConnsKick: make(chan rtmpServerAPIConnsKickReq),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
rtspAddress: rtspAddress,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
isTLS: isTLS,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
conns: make(map[*rtmpConn]struct{}),
chConnClose: make(chan *rtmpConn),
chAPIConnsList: make(chan rtmpServerAPIConnsListReq),
chAPIConnsKick: make(chan rtmpServerAPIConnsKickReq),
}
s.Log(logger.Info, "listener opened on %s", address)
@ -200,7 +197,6 @@ outer: @@ -200,7 +197,6 @@ outer:
c := newRTMPConn(
s.ctx,
s.isTLS,
s.externalAuthenticationURL,
s.rtspAddress,
s.readTimeout,
s.writeTimeout,

235
internal/core/rtsp_conn.go

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
package core
import (
"errors"
"fmt"
"net"
"time"
@ -10,7 +9,6 @@ import ( @@ -10,7 +9,6 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/auth"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/headers"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/google/uuid"
"github.com/aler9/mediamtx/internal/conf"
@ -27,28 +25,24 @@ type rtspConnParent interface { @@ -27,28 +25,24 @@ type rtspConnParent interface {
}
type rtspConn struct {
externalAuthenticationURL string
rtspAddress string
authMethods []headers.AuthMethod
readTimeout conf.StringDuration
runOnConnect string
runOnConnectRestart bool
externalCmdPool *externalcmd.Pool
pathManager *pathManager
conn *gortsplib.ServerConn
parent rtspConnParent
uuid uuid.UUID
created time.Time
onConnectCmd *externalcmd.Cmd
authUser string
authPass string
authValidator *auth.Validator
authFailures int
rtspAddress string
authMethods []headers.AuthMethod
readTimeout conf.StringDuration
runOnConnect string
runOnConnectRestart bool
externalCmdPool *externalcmd.Pool
pathManager *pathManager
conn *gortsplib.ServerConn
parent rtspConnParent
uuid uuid.UUID
created time.Time
onConnectCmd *externalcmd.Cmd
authNonce string
authFailures int
}
func newRTSPConn(
externalAuthenticationURL string,
rtspAddress string,
authMethods []headers.AuthMethod,
readTimeout conf.StringDuration,
@ -60,18 +54,17 @@ func newRTSPConn( @@ -60,18 +54,17 @@ func newRTSPConn(
parent rtspConnParent,
) *rtspConn {
c := &rtspConn{
externalAuthenticationURL: externalAuthenticationURL,
rtspAddress: rtspAddress,
authMethods: authMethods,
readTimeout: readTimeout,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
externalCmdPool: externalCmdPool,
pathManager: pathManager,
conn: conn,
parent: parent,
uuid: uuid.New(),
created: time.Now(),
rtspAddress: rtspAddress,
authMethods: authMethods,
readTimeout: readTimeout,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
externalCmdPool: externalCmdPool,
pathManager: pathManager,
conn: conn,
parent: parent,
uuid: uuid.New(),
created: time.Now(),
}
c.Log(logger.Info, "opened")
@ -112,127 +105,6 @@ func (c *rtspConn) ip() net.IP { @@ -112,127 +105,6 @@ func (c *rtspConn) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}
func (c *rtspConn) authenticate(
path string,
query string,
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
isPublishing bool,
req *base.Request,
baseURL *url.URL,
) error {
if c.externalAuthenticationURL != "" {
username := ""
password := ""
var auth headers.Authorization
err := auth.Unmarshal(req.Header["Authorization"])
if err == nil && auth.Method == headers.AuthBasic {
username = auth.BasicUser
password = auth.BasicPass
}
err = externalAuth(
c.externalAuthenticationURL,
c.ip().String(),
username,
password,
path,
externalAuthProtoRTSP,
&c.uuid,
isPublishing,
query)
if err != nil {
c.authFailures++
// VLC with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without username
// 3) without credentials
// 4) with password and username
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
return pathErrAuthCritical{
message: "unauthorized: " + err.Error(),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
}
v := "IPCAM"
return pathErrAuthNotCritical{
message: "unauthorized: " + err.Error(),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": headers.Authenticate{
Method: headers.AuthBasic,
Realm: &v,
}.Marshal(),
},
},
}
}
}
if pathIPs != nil {
ip := c.ip()
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
message: fmt.Sprintf("IP '%s' not allowed", ip),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
}
}
if pathUser != "" {
// reset authValidator every time the credentials change
if c.authValidator == nil || c.authUser != string(pathUser) || c.authPass != string(pathPass) {
c.authUser = string(pathUser)
c.authPass = string(pathPass)
c.authValidator = auth.NewValidator(string(pathUser), string(pathPass), c.authMethods)
}
err := c.authValidator.ValidateRequest(req, baseURL)
if err != nil {
c.authFailures++
// VLC with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without username
// 3) without credentials
// 4) with password and username
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
return pathErrAuthCritical{
message: "unauthorized: " + err.Error(),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
}
return pathErrAuthNotCritical{
response: &base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authValidator.Header(),
},
},
}
}
// login successful, reset authFailures
c.authFailures = 0
}
return nil
}
// onClose is called by rtspServer.
func (c *rtspConn) onClose(err error) {
c.Log(logger.Info, "closed (%v)", err)
@ -263,29 +135,28 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, @@ -263,29 +135,28 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
}
ctx.Path = ctx.Path[1:]
if c.authNonce == "" {
c.authNonce = auth.GenerateNonce()
}
res := c.pathManager.describe(pathDescribeReq{
pathName: ctx.Path,
url: ctx.Request.URL,
authenticate: func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error {
return c.authenticate(ctx.Path, ctx.Query, pathIPs, pathUser, pathPass, false, ctx.Request, nil)
credentials: authCredentials{
query: ctx.Query,
ip: c.ip(),
proto: authProtocolRTSP,
id: &c.uuid,
rtspRequest: ctx.Request,
rtspNonce: c.authNonce,
},
})
if res.err != nil {
switch terr := res.err.(type) {
case pathErrAuthNotCritical:
c.Log(logger.Debug, "non-critical authentication error: %s", terr.message)
return terr.response, nil, nil
case pathErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(rtspConnPauseAfterAuthError)
return terr.response, nil, errors.New(terr.message)
case pathErrAuth:
res, err := c.handleAuthError(terr.wrapped)
return res, nil, err
case pathErrNoOnePublishing:
return &base.Response{
@ -312,3 +183,29 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, @@ -312,3 +183,29 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
StatusCode: base.StatusOK,
}, res.stream.rtspStream, nil
}
func (c *rtspConn) handleAuthError(authErr error) (*base.Response, error) {
c.authFailures++
// VLC with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without username
// 3) without credentials
// 4) with password and username
// therefore we must allow up to 3 failures
if c.authFailures <= 3 {
return &base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": auth.GenerateWWWAuthenticate(c.authMethods, "IPCAM", c.authNonce),
},
}, nil
}
// wait some seconds to stop brute force attacks
<-time.After(rtspConnPauseAfterAuthError)
return &base.Response{
StatusCode: base.StatusUnauthorized,
}, authErr
}

56
internal/core/rtsp_server.go

@ -76,18 +76,17 @@ func printAddresses(srv *gortsplib.Server) string { @@ -76,18 +76,17 @@ func printAddresses(srv *gortsplib.Server) string {
}
type rtspServer struct {
externalAuthenticationURL string
authMethods []headers.AuthMethod
readTimeout conf.StringDuration
isTLS bool
rtspAddress string
protocols map[conf.Protocol]struct{}
runOnConnect string
runOnConnectRestart bool
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent rtspServerParent
authMethods []headers.AuthMethod
readTimeout conf.StringDuration
isTLS bool
rtspAddress string
protocols map[conf.Protocol]struct{}
runOnConnect string
runOnConnectRestart bool
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent rtspServerParent
ctx context.Context
ctxCancel func()
@ -100,7 +99,6 @@ type rtspServer struct { @@ -100,7 +99,6 @@ type rtspServer struct {
func newRTSPServer(
parentCtx context.Context,
externalAuthenticationURL string,
address string,
authMethods []headers.AuthMethod,
readTimeout conf.StringDuration,
@ -128,22 +126,21 @@ func newRTSPServer( @@ -128,22 +126,21 @@ func newRTSPServer(
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &rtspServer{
externalAuthenticationURL: externalAuthenticationURL,
authMethods: authMethods,
readTimeout: readTimeout,
isTLS: isTLS,
rtspAddress: rtspAddress,
protocols: protocols,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
conns: make(map[*gortsplib.ServerConn]*rtspConn),
sessions: make(map[*gortsplib.ServerSession]*rtspSession),
authMethods: authMethods,
readTimeout: readTimeout,
isTLS: isTLS,
rtspAddress: rtspAddress,
protocols: protocols,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
conns: make(map[*gortsplib.ServerConn]*rtspConn),
sessions: make(map[*gortsplib.ServerSession]*rtspSession),
}
s.srv = &gortsplib.Server{
@ -246,7 +243,6 @@ outer: @@ -246,7 +243,6 @@ outer:
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.
func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
c := newRTSPConn(
s.externalAuthenticationURL,
s.rtspAddress,
s.authMethods,
s.readTimeout,

15
internal/core/rtsp_server_test.go

@ -115,13 +115,14 @@ func TestRTSPServerAuth(t *testing.T) { @@ -115,13 +115,14 @@ func TestRTSPServerAuth(t *testing.T) {
}
t.Run("hashed", func(t *testing.T) {
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ=\n" +
" publishPass: sha256:E9JJ8stBJ7QM+nV4ZoUCeHk/gU3tPFh/5YieiJp6n2w=\n")
p, ok := newInstance(
"rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ=\n" +
" publishPass: sha256:E9JJ8stBJ7QM+nV4ZoUCeHk/gU3tPFh/5YieiJp6n2w=\n")
require.Equal(t, true, ok)
defer p.Close()

91
internal/core/rtsp_session.go

@ -2,13 +2,13 @@ package core @@ -2,13 +2,13 @@ package core
import (
"encoding/hex"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/auth"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
@ -22,10 +22,6 @@ import ( @@ -22,10 +22,6 @@ import (
"github.com/aler9/mediamtx/internal/logger"
)
const (
pauseAfterAuthError = 2 * time.Second
)
type rtspWriteFunc func(*rtp.Packet)
func getRTSPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc {
@ -202,29 +198,28 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -202,29 +198,28 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
}
ctx.Path = ctx.Path[1:]
if c.authNonce == "" {
c.authNonce = auth.GenerateNonce()
}
res := s.pathManager.publisherAdd(pathPublisherAddReq{
author: s,
pathName: ctx.Path,
authenticate: func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error {
return c.authenticate(ctx.Path, ctx.Query, pathIPs, pathUser, pathPass, true, ctx.Request, nil)
credentials: authCredentials{
query: ctx.Query,
ip: c.ip(),
proto: authProtocolRTSP,
id: &c.uuid,
rtspRequest: ctx.Request,
rtspBaseURL: nil,
rtspNonce: c.authNonce,
},
})
if res.err != nil {
switch terr := res.err.(type) {
case pathErrAuthNotCritical:
s.Log(logger.Debug, "non-critical authentication error: %s", terr.message)
return terr.response, nil
case pathErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError)
return terr.response, errors.New(terr.message)
case pathErrAuth:
return c.handleAuthError(terr.wrapped)
default:
return &base.Response{
@ -268,42 +263,42 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -268,42 +263,42 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.session.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
baseURL := &url.URL{
Scheme: ctx.Request.URL.Scheme,
Host: ctx.Request.URL.Host,
Path: ctx.Path,
RawQuery: ctx.Query,
}
if ctx.Query != "" {
baseURL.RawQuery += "/"
} else {
baseURL.Path += "/"
}
if c.authNonce == "" {
c.authNonce = auth.GenerateNonce()
}
res := s.pathManager.readerAdd(pathReaderAddReq{
author: s,
pathName: ctx.Path,
authenticate: func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error {
baseURL := &url.URL{
Scheme: ctx.Request.URL.Scheme,
Host: ctx.Request.URL.Host,
Path: ctx.Path,
RawQuery: ctx.Query,
}
if ctx.Query != "" {
baseURL.RawQuery += "/"
} else {
baseURL.Path += "/"
}
return c.authenticate(ctx.Path, ctx.Query, pathIPs, pathUser, pathPass, false, ctx.Request, baseURL)
credentials: authCredentials{
query: ctx.Query,
ip: c.ip(),
proto: authProtocolRTSP,
id: &c.uuid,
rtspRequest: ctx.Request,
rtspBaseURL: baseURL,
rtspNonce: c.authNonce,
},
})
if res.err != nil {
switch terr := res.err.(type) {
case pathErrAuthNotCritical:
s.Log(logger.Debug, "non-critical authentication error: %s", terr.message)
return terr.response, nil, nil
case pathErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError)
return terr.response, nil, errors.New(terr.message)
case pathErrAuth:
res, err := c.handleAuthError(terr.wrapped)
return res, nil, err
case pathErrNoOnePublishing:
return &base.Response{

9
internal/core/webrtc_conn.go

@ -26,7 +26,6 @@ import ( @@ -26,7 +26,6 @@ import (
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/aler9/mediamtx/internal/conf"
"github.com/aler9/mediamtx/internal/formatprocessor"
"github.com/aler9/mediamtx/internal/logger"
"github.com/aler9/mediamtx/internal/websocket"
@ -302,13 +301,7 @@ func (c *webRTCConn) runInner(ctx context.Context) error { @@ -302,13 +301,7 @@ func (c *webRTCConn) runInner(ctx context.Context) error {
res := c.pathManager.readerAdd(pathReaderAddReq{
author: c,
pathName: c.pathName,
authenticate: func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error {
return nil
},
skipAuth: true,
})
if res.err != nil {
return res.err

161
internal/core/webrtc_server.go

@ -68,14 +68,13 @@ type webRTCServerParent interface { @@ -68,14 +68,13 @@ type webRTCServerParent interface {
}
type webRTCServer struct {
externalAuthenticationURL string
allowOrigin string
trustedProxies conf.IPsOrCIDRs
iceServers []string
readBufferCount int
pathManager *pathManager
metrics *metrics
parent webRTCServerParent
allowOrigin string
trustedProxies conf.IPsOrCIDRs
iceServers []string
readBufferCount int
pathManager *pathManager
metrics *metrics
parent webRTCServerParent
ctx context.Context
ctxCancel func()
@ -101,7 +100,6 @@ type webRTCServer struct { @@ -101,7 +100,6 @@ type webRTCServer struct {
func newWebRTCServer(
parentCtx context.Context,
externalAuthenticationURL string,
address string,
encryption bool,
serverKey string,
@ -159,28 +157,27 @@ func newWebRTCServer( @@ -159,28 +157,27 @@ func newWebRTCServer(
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &webRTCServer{
externalAuthenticationURL: externalAuthenticationURL,
allowOrigin: allowOrigin,
trustedProxies: trustedProxies,
iceServers: iceServers,
readBufferCount: readBufferCount,
pathManager: pathManager,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
udpMuxLn: udpMuxLn,
tcpMuxLn: tcpMuxLn,
iceUDPMux: iceUDPMux,
iceTCPMux: iceTCPMux,
iceHostNAT1To1IPs: iceHostNAT1To1IPs,
conns: make(map[*webRTCConn]struct{}),
connNew: make(chan webRTCConnNewReq),
chConnClose: make(chan *webRTCConn),
chAPIConnsList: make(chan webRTCServerAPIConnsListReq),
chAPIConnsKick: make(chan webRTCServerAPIConnsKickReq),
done: make(chan struct{}),
allowOrigin: allowOrigin,
trustedProxies: trustedProxies,
iceServers: iceServers,
readBufferCount: readBufferCount,
pathManager: pathManager,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
udpMuxLn: udpMuxLn,
tcpMuxLn: tcpMuxLn,
iceUDPMux: iceUDPMux,
iceTCPMux: iceTCPMux,
iceHostNAT1To1IPs: iceHostNAT1To1IPs,
conns: make(map[*webRTCConn]struct{}),
connNew: make(chan webRTCConnNewReq),
chConnClose: make(chan *webRTCConn),
chAPIConnsList: make(chan webRTCServerAPIConnsListReq),
chAPIConnsKick: make(chan webRTCServerAPIConnsKickReq),
done: make(chan struct{}),
}
s.requestPool = newHTTPRequestPool()
@ -357,26 +354,32 @@ func (s *webRTCServer) onRequest(ctx *gin.Context) { @@ -357,26 +354,32 @@ func (s *webRTCServer) onRequest(ctx *gin.Context) {
}
dir = strings.TrimSuffix(dir, "/")
res := s.pathManager.describe(pathDescribeReq{
pathName: dir,
user, pass, hasCredentials := ctx.Request.BasicAuth()
res := s.pathManager.getPathConf(pathGetPathConfReq{
name: dir,
credentials: authCredentials{
query: ctx.Request.URL.RawQuery,
ip: net.ParseIP(ctx.ClientIP()),
user: user,
pass: pass,
proto: authProtocolWebRTC,
},
})
if res.err != nil {
ctx.Writer.WriteHeader(http.StatusNotFound)
return
}
if terr, ok := res.err.(pathErrAuth); ok {
if !hasCredentials {
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
ctx.Writer.WriteHeader(http.StatusUnauthorized)
return
}
err := s.authenticate(res.path, ctx)
if err != nil {
if terr, ok := err.(pathErrAuthCritical); ok {
s.Log(logger.Info, "authentication error: %s", terr.message)
ctx.Writer.Header().Set("WWW-Authenticate", `Basic realm="mediamtx"`)
s.Log(logger.Info, "authentication error: %v", terr.wrapped)
ctx.Writer.WriteHeader(http.StatusUnauthorized)
return
}
ctx.Writer.Header().Set("WWW-Authenticate", `Basic realm="mediamtx"`)
ctx.Writer.WriteHeader(http.StatusUnauthorized)
ctx.Writer.WriteHeader(http.StatusNotFound)
return
}
@ -394,7 +397,10 @@ func (s *webRTCServer) onRequest(ctx *gin.Context) { @@ -394,7 +397,10 @@ func (s *webRTCServer) onRequest(ctx *gin.Context) {
}
defer wsconn.Close()
c := s.newConn(dir, wsconn)
c := s.newConn(webRTCConnNewReq{
pathName: dir,
wsconn: wsconn,
})
if c == nil {
return
}
@ -403,12 +409,8 @@ func (s *webRTCServer) onRequest(ctx *gin.Context) { @@ -403,12 +409,8 @@ func (s *webRTCServer) onRequest(ctx *gin.Context) {
}
}
func (s *webRTCServer) newConn(dir string, wsconn *websocket.ServerConn) *webRTCConn {
req := webRTCConnNewReq{
pathName: dir,
wsconn: wsconn,
res: make(chan *webRTCConn),
}
func (s *webRTCServer) newConn(req webRTCConnNewReq) *webRTCConn {
req.res = make(chan *webRTCConn)
select {
case s.connNew <- req:
@ -418,63 +420,6 @@ func (s *webRTCServer) newConn(dir string, wsconn *websocket.ServerConn) *webRTC @@ -418,63 +420,6 @@ func (s *webRTCServer) newConn(dir string, wsconn *websocket.ServerConn) *webRTC
}
}
func (s *webRTCServer) authenticate(pa *path, ctx *gin.Context) error {
pathConf := pa.safeConf()
pathIPs := pathConf.ReadIPs
pathUser := pathConf.ReadUser
pathPass := pathConf.ReadPass
if s.externalAuthenticationURL != "" {
ip := net.ParseIP(ctx.ClientIP())
user, pass, ok := ctx.Request.BasicAuth()
err := externalAuth(
s.externalAuthenticationURL,
ip.String(),
user,
pass,
pa.name,
externalAuthProtoWebRTC,
nil,
false,
ctx.Request.URL.RawQuery)
if err != nil {
if !ok {
return pathErrAuthNotCritical{}
}
return pathErrAuthCritical{
message: fmt.Sprintf("external authentication failed: %s", err),
}
}
}
if pathIPs != nil {
ip := net.ParseIP(ctx.ClientIP())
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
message: fmt.Sprintf("IP '%s' not allowed", ip),
}
}
}
if pathUser != "" {
user, pass, ok := ctx.Request.BasicAuth()
if !ok {
return pathErrAuthNotCritical{}
}
if user != string(pathUser) || pass != string(pathPass) {
return pathErrAuthCritical{
message: "invalid credentials",
}
}
}
return nil
}
// connClose is called by webRTCConn.
func (s *webRTCServer) connClose(c *webRTCConn) {
select {

3
internal/websocket/serverconn.go

@ -23,7 +23,8 @@ var upgrader = websocket.Upgrader{ @@ -23,7 +23,8 @@ var upgrader = websocket.Upgrader{
},
}
// ServerConn is a server-side WebSocket connection with automatic, periodic ping / pong.
// ServerConn is a server-side WebSocket connection with
// automatic, periodic ping-pong
type ServerConn struct {
wc *websocket.Conn

5
mediamtx.yml

@ -96,8 +96,9 @@ multicastRTCPPort: 8003 @@ -96,8 +96,9 @@ multicastRTCPPort: 8003
serverKey: server.key
# Path to the server certificate. This is needed only when encryption is "strict" or "optional".
serverCert: server.crt
# Authentication methods.
authMethods: [basic, digest]
# Authentication methods. Available are "basic" and "digest".
# "digest" doesn't provide any additional security and is available for compatibility reasons only.
authMethods: [basic]
###############################################
# RTMP parameters

Loading…
Cancel
Save