Browse Source

add environment variable MTX_QUERY to some hooks (#2483) (#2522)

pull/2530/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
3a5bb06e26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      README.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 69
      internal/core/authentication.go
  5. 16
      internal/core/hls_http_server.go
  6. 8
      internal/core/hls_muxer.go
  7. 7
      internal/core/mpegts.go
  8. 125
      internal/core/path.go
  9. 41
      internal/core/path_manager.go
  10. 62
      internal/core/path_test.go
  11. 32
      internal/core/publisher.go
  12. 52
      internal/core/reader.go
  13. 70
      internal/core/rtmp_conn.go
  14. 5
      internal/core/rtsp_conn.go
  15. 65
      internal/core/rtsp_session.go
  16. 43
      internal/core/source.go
  17. 69
      internal/core/srt_conn.go
  18. 16
      internal/core/webrtc_http_server.go
  19. 71
      internal/core/webrtc_session.go
  20. 3
      mediamtx.yml

3
README.md

@ -1311,6 +1311,7 @@ paths: @@ -1311,6 +1311,7 @@ paths:
# This is terminated with SIGINT when the program closes.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by first reader)
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
@ -1326,6 +1327,7 @@ pathDefaults: @@ -1326,6 +1327,7 @@ pathDefaults:
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by publisher)
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
# * RTSP_PORT: RTSP server port
@ -1351,6 +1353,7 @@ pathDefaults: @@ -1351,6 +1353,7 @@ pathDefaults:
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by reader)
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
# * RTSP_PORT: RTSP server port

2
go.mod

@ -8,7 +8,7 @@ require ( @@ -8,7 +8,7 @@ require (
github.com/alecthomas/kong v0.8.1
github.com/aler9/writerseeker v1.1.0
github.com/bluenviron/gohlslib v1.0.4
github.com/bluenviron/gortsplib/v4 v4.2.1
github.com/bluenviron/gortsplib/v4 v4.2.2-0.20231017183154-2f7ed32139f2
github.com/bluenviron/mediacommon v1.5.0
github.com/datarhei/gosrt v0.5.4
github.com/fsnotify/fsnotify v1.6.0

4
go.sum

@ -14,8 +14,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh @@ -14,8 +14,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI=
github.com/bluenviron/gohlslib v1.0.4 h1:cNpNQSPzomM7fqSaGPbdRxpq5QtI3b0vjT3nWQ5/LrU=
github.com/bluenviron/gohlslib v1.0.4/go.mod h1:yMw/Gm+SeC498fHc7DnqKvczTMZGBdJdJUmAM+k+pu0=
github.com/bluenviron/gortsplib/v4 v4.2.1 h1:LugQr3TIKoj6GjOf470teDP8goWiL8PTrX2OaF+L2Vc=
github.com/bluenviron/gortsplib/v4 v4.2.1/go.mod h1:VOoeI2VxRKh5eEg6Y48DGb/oLxU1i+X0Xzv9z8dvsUQ=
github.com/bluenviron/gortsplib/v4 v4.2.2-0.20231017183154-2f7ed32139f2 h1:NkmQiKSdKAMuG+BFsxhSp6tf6laYJR3d+T+frUKBQAI=
github.com/bluenviron/gortsplib/v4 v4.2.2-0.20231017183154-2f7ed32139f2/go.mod h1:VOoeI2VxRKh5eEg6Y48DGb/oLxU1i+X0Xzv9z8dvsUQ=
github.com/bluenviron/mediacommon v1.5.0 h1:lS0YKNo22ZOyCsYcLh3jn3TgUALqYw0f7RVwalC09vI=
github.com/bluenviron/mediacommon v1.5.0/go.mod h1:Ij/kE1LEucSjryNBVTyPL/gBI0d6/Css3f5PyrM957w=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=

69
internal/core/authentication.go

@ -7,14 +7,11 @@ import ( @@ -7,14 +7,11 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"github.com/bluenviron/gortsplib/v4/pkg/auth"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf"
@ -53,23 +50,9 @@ const ( @@ -53,23 +50,9 @@ const (
authProtocolSRT authProtocol = "srt"
)
type authCredentials struct {
query string
ip net.IP
user string
pass string
proto authProtocol
id *uuid.UUID
rtspRequest *base.Request
rtspBaseURL *url.URL
rtspNonce string
}
func doExternalAuthentication(
ur string,
path string,
publish bool,
credentials authCredentials,
accessRequest pathAccessRequest,
) error {
enc, _ := json.Marshal(struct {
IP string `json:"ip"`
@ -81,19 +64,19 @@ func doExternalAuthentication( @@ -81,19 +64,19 @@ func doExternalAuthentication(
Action string `json:"action"`
Query string `json:"query"`
}{
IP: credentials.ip.String(),
User: credentials.user,
Password: credentials.pass,
Path: path,
Protocol: string(credentials.proto),
ID: credentials.id,
IP: accessRequest.ip.String(),
User: accessRequest.user,
Password: accessRequest.pass,
Path: accessRequest.name,
Protocol: string(accessRequest.proto),
ID: accessRequest.id,
Action: func() string {
if publish {
if accessRequest.publish {
return "publish"
}
return "read"
}(),
Query: credentials.query,
Query: accessRequest.query,
})
res, err := http.Post(ur, "application/json", bytes.NewReader(enc))
if err != nil {
@ -114,26 +97,22 @@ func doExternalAuthentication( @@ -114,26 +97,22 @@ func doExternalAuthentication(
func doAuthentication(
externalAuthenticationURL string,
rtspAuthMethods conf.AuthMethods,
pathName string,
pathConf *conf.Path,
publish bool,
credentials authCredentials,
accessRequest pathAccessRequest,
) error {
var rtspAuth headers.Authorization
if credentials.rtspRequest != nil {
err := rtspAuth.Unmarshal(credentials.rtspRequest.Header["Authorization"])
if accessRequest.rtspRequest != nil {
err := rtspAuth.Unmarshal(accessRequest.rtspRequest.Header["Authorization"])
if err == nil && rtspAuth.Method == headers.AuthBasic {
credentials.user = rtspAuth.BasicUser
credentials.pass = rtspAuth.BasicPass
accessRequest.user = rtspAuth.BasicUser
accessRequest.pass = rtspAuth.BasicPass
}
}
if externalAuthenticationURL != "" {
err := doExternalAuthentication(
externalAuthenticationURL,
pathName,
publish,
credentials,
accessRequest,
)
if err != nil {
return &errAuthentication{message: fmt.Sprintf("external authentication failed: %s", err)}
@ -144,7 +123,7 @@ func doAuthentication( @@ -144,7 +123,7 @@ func doAuthentication(
var pathUser string
var pathPass string
if publish {
if accessRequest.publish {
pathIPs = pathConf.PublishIPs
pathUser = string(pathConf.PublishUser)
pathPass = string(pathConf.PublishPass)
@ -155,26 +134,26 @@ func doAuthentication( @@ -155,26 +134,26 @@ func doAuthentication(
}
if pathIPs != nil {
if !ipEqualOrInRange(credentials.ip, pathIPs) {
return &errAuthentication{message: fmt.Sprintf("IP %s not allowed", credentials.ip)}
if !ipEqualOrInRange(accessRequest.ip, pathIPs) {
return &errAuthentication{message: fmt.Sprintf("IP %s not allowed", accessRequest.ip)}
}
}
if pathUser != "" {
if credentials.rtspRequest != nil && rtspAuth.Method == headers.AuthDigest {
if accessRequest.rtspRequest != nil && rtspAuth.Method == headers.AuthDigest {
err := auth.Validate(
credentials.rtspRequest,
accessRequest.rtspRequest,
pathUser,
pathPass,
credentials.rtspBaseURL,
accessRequest.rtspBaseURL,
rtspAuthMethods,
"IPCAM",
credentials.rtspNonce)
accessRequest.rtspNonce)
if err != nil {
return &errAuthentication{message: err.Error()}
}
} else if !checkCredential(pathUser, credentials.user) ||
!checkCredential(pathPass, credentials.pass) {
} else if !checkCredential(pathUser, accessRequest.user) ||
!checkCredential(pathPass, accessRequest.pass) {
return &errAuthentication{message: "invalid credentials"}
}
}

16
internal/core/hls_http_server.go

@ -163,14 +163,14 @@ func (s *hlsHTTPServer) onRequest(ctx *gin.Context) { @@ -163,14 +163,14 @@ func (s *hlsHTTPServer) onRequest(ctx *gin.Context) {
user, pass, hasCredentials := ctx.Request.BasicAuth()
res := s.pathManager.getConfForPath(pathGetConfForPathReq{
name: dir,
publish: false,
credentials: authCredentials{
query: ctx.Request.URL.RawQuery,
ip: net.ParseIP(ctx.ClientIP()),
user: user,
pass: pass,
proto: authProtocolHLS,
accessRequest: pathAccessRequest{
name: dir,
query: ctx.Request.URL.RawQuery,
publish: false,
ip: net.ParseIP(ctx.ClientIP()),
user: user,
pass: pass,
proto: authProtocolHLS,
},
})
if res.err != nil {

8
internal/core/hls_muxer.go

@ -242,9 +242,11 @@ func (m *hlsMuxer) clearQueuedRequests() { @@ -242,9 +242,11 @@ func (m *hlsMuxer) clearQueuedRequests() {
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := m.pathManager.addReader(pathAddReaderReq{
author: m,
pathName: m.pathName,
skipAuth: true,
author: m,
accessRequest: pathAccessRequest{
name: m.pathName,
skipAuth: true,
},
})
if res.err != nil {
return res.err

7
internal/core/mpegts.go

@ -211,8 +211,11 @@ func mpegtsSetupRead(r *mpegts.Reader, stream **stream.Stream) ([]*description.M @@ -211,8 +211,11 @@ func mpegtsSetupRead(r *mpegts.Reader, stream **stream.Stream) ([]*description.M
return medias, nil
}
func mpegtsSetupWrite(stream *stream.Stream,
writer *asyncwriter.Writer, bw *bufio.Writer, sconn srt.Conn,
func mpegtsSetupWrite(
stream *stream.Stream,
writer *asyncwriter.Writer,
bw *bufio.Writer,
sconn srt.Conn,
writeTimeout time.Duration,
) error {
var w *mpegts.Writer

125
internal/core/path.go

@ -9,8 +9,10 @@ import ( @@ -9,8 +9,10 @@ import (
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
@ -50,6 +52,23 @@ const ( @@ -50,6 +52,23 @@ const (
pathOnDemandStateClosing
)
type pathAccessRequest struct {
name string
query string
publish bool
skipAuth bool
// only if skipAuth = false
ip net.IP
user string
pass string
proto authProtocol
id *uuid.UUID
rtspRequest *base.Request
rtspBaseURL *url.URL
rtspNonce string
}
type pathSourceStaticSetReadyRes struct {
stream *stream.Stream
err error
@ -81,10 +100,8 @@ type pathGetConfForPathRes struct { @@ -81,10 +100,8 @@ type pathGetConfForPathRes struct {
}
type pathGetConfForPathReq struct {
name string
publish bool
credentials authCredentials
res chan pathGetConfForPathRes
accessRequest pathAccessRequest
res chan pathGetConfForPathRes
}
type pathDescribeRes struct {
@ -95,10 +112,8 @@ type pathDescribeRes struct { @@ -95,10 +112,8 @@ type pathDescribeRes struct {
}
type pathDescribeReq struct {
pathName string
url *url.URL
credentials authCredentials
res chan pathDescribeRes
accessRequest pathAccessRequest
res chan pathDescribeRes
}
type pathAddReaderRes struct {
@ -108,11 +123,9 @@ type pathAddReaderRes struct { @@ -108,11 +123,9 @@ type pathAddReaderRes struct {
}
type pathAddReaderReq struct {
author reader
pathName string
skipAuth bool
credentials authCredentials
res chan pathAddReaderRes
author reader
accessRequest pathAccessRequest
res chan pathAddReaderRes
}
type pathAddPublisherRes struct {
@ -121,11 +134,9 @@ type pathAddPublisherRes struct { @@ -121,11 +134,9 @@ type pathAddPublisherRes struct {
}
type pathAddPublisherReq struct {
author publisher
pathName string
skipAuth bool
credentials authCredentials
res chan pathAddPublisherRes
author publisher
accessRequest pathAccessRequest
res chan pathAddPublisherRes
}
type pathStartPublisherRes struct {
@ -183,14 +194,15 @@ type path struct { @@ -183,14 +194,15 @@ type path struct {
ctxCancel func()
confMutex sync.RWMutex
source source
publisherQuery string
stream *stream.Stream
recordAgent *record.Agent
readyTime time.Time
onUnDemandHook func(string)
onNotReadyHook func()
readers map[reader]struct{}
describeRequestsOnHold []pathDescribeReq
readerAddRequestsOnHold []pathAddReaderReq
onDemandCmd *externalcmd.Cmd
onReadyCmd *externalcmd.Cmd
onDemandStaticSourceState pathOnDemandState
onDemandStaticSourceReadyTimer *time.Timer
onDemandStaticSourceCloseTimer *time.Timer
@ -358,9 +370,8 @@ func (pa *path) run() { @@ -358,9 +370,8 @@ func (pa *path) run() {
}
}
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.Log(logger.Info, "runOnDemand command stopped")
if pa.onUnDemandHook != nil {
pa.onUnDemandHook("path closed")
}
pa.Log(logger.Debug, "destroyed: %v", err)
@ -578,7 +589,7 @@ func (pa *path) doDescribe(req pathDescribeReq) { @@ -578,7 +589,7 @@ func (pa *path) doDescribe(req pathDescribeReq) {
if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart()
pa.onDemandPublisherStart(req.accessRequest.query)
}
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
return
@ -588,9 +599,9 @@ func (pa *path) doDescribe(req pathDescribeReq) { @@ -588,9 +599,9 @@ func (pa *path) doDescribe(req pathDescribeReq) {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := url.URL{
Scheme: req.url.Scheme,
User: req.url.User,
Host: req.url.Host,
Scheme: req.accessRequest.rtspRequest.URL.Scheme,
User: req.accessRequest.rtspRequest.URL.User,
Host: req.accessRequest.rtspRequest.URL.Host,
Path: pa.conf.Fallback,
}
return ur.String()
@ -631,6 +642,7 @@ func (pa *path) doAddPublisher(req pathAddPublisherReq) { @@ -631,6 +642,7 @@ func (pa *path) doAddPublisher(req pathAddPublisherReq) {
}
pa.source = req.author
pa.publisherQuery = req.accessRequest.query
req.res <- pathAddPublisherRes{path: pa}
}
@ -696,7 +708,7 @@ func (pa *path) doAddReader(req pathAddReaderReq) { @@ -696,7 +708,7 @@ func (pa *path) doAddReader(req pathAddReaderReq) {
if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart()
pa.onDemandPublisherStart(req.accessRequest.query)
}
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
return
@ -825,16 +837,8 @@ func (pa *path) onDemandStaticSourceStop(reason string) { @@ -825,16 +837,8 @@ func (pa *path) onDemandStaticSourceStop(reason string) {
pa.source.(*sourceStatic).stop(reason)
}
func (pa *path) onDemandPublisherStart() {
pa.Log(logger.Info, "runOnDemand command started")
pa.onDemandCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnDemand,
pa.conf.RunOnDemandRestart,
pa.externalCmdEnv(),
func(err error) {
pa.Log(logger.Info, "runOnDemand command exited: %v", err)
})
func (pa *path) onDemandPublisherStart(query string) {
pa.onUnDemandHook = publisherOnDemandHook(pa, query)
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
@ -862,11 +866,8 @@ func (pa *path) onDemandPublisherStop(reason string) { @@ -862,11 +866,8 @@ func (pa *path) onDemandPublisherStop(reason string) {
pa.onDemandPublisherState = pathOnDemandStateInitial
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.Log(logger.Info, "runOnDemand command stopped: %s", reason)
}
pa.onUnDemandHook(reason)
pa.onUnDemandHook = nil
}
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
@ -887,22 +888,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error @@ -887,22 +888,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
pa.readyTime = time.Now()
if pa.conf.RunOnReady != "" {
env := pa.externalCmdEnv()
desc := pa.source.apiSourceDescribe()
env["MTX_SOURCE_TYPE"] = desc.Type
env["MTX_SOURCE_ID"] = desc.ID
pa.Log(logger.Info, "runOnReady command started")
pa.onReadyCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnReady,
pa.conf.RunOnReadyRestart,
env,
func(err error) {
pa.Log(logger.Info, "runOnReady command exited: %v", err)
})
}
pa.onNotReadyHook = sourceOnReadyHook(pa)
pa.parent.pathReady(pa)
@ -917,26 +903,7 @@ func (pa *path) setNotReady() { @@ -917,26 +903,7 @@ func (pa *path) setNotReady() {
r.close()
}
if pa.onReadyCmd != nil {
pa.onReadyCmd.Close()
pa.onReadyCmd = nil
pa.Log(logger.Info, "runOnReady command stopped")
}
if pa.conf.RunOnNotReady != "" {
env := pa.externalCmdEnv()
desc := pa.source.apiSourceDescribe()
env["MTX_SOURCE_TYPE"] = desc.Type
env["MTX_SOURCE_ID"] = desc.ID
pa.Log(logger.Info, "runOnNotReady command launched")
externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnNotReady,
false,
env,
nil)
}
pa.onNotReadyHook()
if pa.recordAgent != nil {
pa.recordAgent.Close()

41
internal/core/path_manager.go

@ -284,14 +284,14 @@ func (pm *pathManager) doPathNotReady(pa *path) { @@ -284,14 +284,14 @@ func (pm *pathManager) doPathNotReady(pa *path) {
}
func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) {
_, pathConf, _, err := getConfForPath(pm.pathConfs, req.name)
_, pathConf, _, err := getConfForPath(pm.pathConfs, req.accessRequest.name)
if err != nil {
req.res <- pathGetConfForPathRes{err: err}
return
}
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods,
req.name, pathConf, req.publish, req.credentials)
pathConf, req.accessRequest)
if err != nil {
req.res <- pathGetConfForPathRes{err: err}
return
@ -301,35 +301,37 @@ func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) { @@ -301,35 +301,37 @@ func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) {
}
func (pm *pathManager) doDescribe(req pathDescribeReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.accessRequest.name)
if err != nil {
req.res <- pathDescribeRes{err: err}
return
}
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods,
pathConf, req.accessRequest)
if err != nil {
req.res <- pathDescribeRes{err: err}
return
}
// create path if it doesn't exist
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
if _, ok := pm.paths[req.accessRequest.name]; !ok {
pm.createPath(pathConfName, pathConf, req.accessRequest.name, pathMatches)
}
req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
req.res <- pathDescribeRes{path: pm.paths[req.accessRequest.name]}
}
func (pm *pathManager) doAddReader(req pathAddReaderReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.accessRequest.name)
if err != nil {
req.res <- pathAddReaderRes{err: err}
return
}
if !req.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
if !req.accessRequest.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods,
pathConf, req.accessRequest)
if err != nil {
req.res <- pathAddReaderRes{err: err}
return
@ -337,22 +339,23 @@ func (pm *pathManager) doAddReader(req pathAddReaderReq) { @@ -337,22 +339,23 @@ func (pm *pathManager) doAddReader(req pathAddReaderReq) {
}
// create path if it doesn't exist
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
if _, ok := pm.paths[req.accessRequest.name]; !ok {
pm.createPath(pathConfName, pathConf, req.accessRequest.name, pathMatches)
}
req.res <- pathAddReaderRes{path: pm.paths[req.pathName]}
req.res <- pathAddReaderRes{path: pm.paths[req.accessRequest.name]}
}
func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.accessRequest.name)
if err != nil {
req.res <- pathAddPublisherRes{err: err}
return
}
if !req.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials)
if !req.accessRequest.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods,
pathConf, req.accessRequest)
if err != nil {
req.res <- pathAddPublisherRes{err: err}
return
@ -360,11 +363,11 @@ func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) { @@ -360,11 +363,11 @@ func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) {
}
// create path if it doesn't exist
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
if _, ok := pm.paths[req.accessRequest.name]; !ok {
pm.createPath(pathConfName, pathConf, req.accessRequest.name, pathMatches)
}
req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]}
req.res <- pathAddPublisherRes{path: pm.paths[req.accessRequest.name]}
}
func (pm *pathManager) doAPIPathsList(req pathAPIPathsListReq) {

62
internal/core/path_test.go

@ -9,6 +9,7 @@ import ( @@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
@ -25,11 +26,7 @@ import ( @@ -25,11 +26,7 @@ import (
"github.com/bluenviron/mediamtx/internal/rtmp"
)
func TestPathRunOnDemand(t *testing.T) {
onDemandFile := filepath.Join(os.TempDir(), "ondemand")
srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile, []byte(`
var runOnDemandSampleScript = `
package main
import (
@ -42,7 +39,9 @@ import ( @@ -42,7 +39,9 @@ import (
)
func main() {
if os.Getenv("G1") != "on" {
if os.Getenv("MTX_PATH") != "ondemand" ||
os.Getenv("MTX_QUERY") != "param=value" ||
os.Getenv("G1") != "on" {
panic("environment not set")
}
@ -74,12 +73,19 @@ func main() { @@ -74,12 +73,19 @@ func main() {
signal.Notify(c, syscall.SIGINT)
<-c
err = os.WriteFile("`+onDemandFile+`", []byte(""), 0644)
err = os.WriteFile("ON_DEMAND_FILE", []byte(""), 0644)
if err != nil {
panic(err)
}
}
`), 0o644)
`
func TestPathRunOnDemand(t *testing.T) {
onDemandFile := filepath.Join(os.TempDir(), "ondemand")
srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile,
[]byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemandFile)), 0o644)
require.NoError(t, err)
execFile := filepath.Join(os.TempDir(), "ondemand_cmd")
@ -115,7 +121,7 @@ func main() { @@ -115,7 +121,7 @@ func main() {
br := bufio.NewReader(conn)
if ca == "describe" || ca == "describe and setup" {
u, err := rtspurl.Parse("rtsp://localhost:8554/ondemand")
u, err := rtspurl.Parse("rtsp://localhost:8554/ondemand?param=value")
require.NoError(t, err)
byts, _ := base.Request{
@ -138,7 +144,7 @@ func main() { @@ -138,7 +144,7 @@ func main() {
require.NoError(t, err)
control, _ = desc.MediaDescriptions[0].Attribute("control")
} else {
control = "rtsp://localhost:8554/ondemand/"
control = "rtsp://localhost:8554/ondemand?param=value/"
}
if ca == "setup" || ca == "describe and setup" {
@ -259,15 +265,15 @@ func TestPathRunOnReady(t *testing.T) { @@ -259,15 +265,15 @@ func TestPathRunOnReady(t *testing.T) {
"webrtc: no\n"+
"paths:\n"+
" test:\n"+
" runOnReady: touch %s\n"+
" runOnNotReady: touch %s\n",
" runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
onReadyFile, onNotReadyFile))
require.Equal(t, true, ok)
defer p.Close()
c := gortsplib.Client{}
err := c.StartRecording(
"rtsp://localhost:8554/test",
"rtsp://localhost:8554/test?query=value",
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer c.Close()
@ -275,11 +281,13 @@ func TestPathRunOnReady(t *testing.T) { @@ -275,11 +281,13 @@ func TestPathRunOnReady(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()
_, err := os.Stat(onReadyFile)
byts, err := os.ReadFile(onReadyFile)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
_, err = os.Stat(onNotReadyFile)
byts, err = os.ReadFile(onNotReadyFile)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
}
func TestPathRunOnRead(t *testing.T) {
@ -295,8 +303,8 @@ func TestPathRunOnRead(t *testing.T) { @@ -295,8 +303,8 @@ func TestPathRunOnRead(t *testing.T) {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
" test:\n"+
" runOnRead: touch %s\n"+
" runOnUnread: touch %s\n",
" runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
onReadFile, onUnreadFile))
require.Equal(t, true, ok)
defer p.Close()
@ -312,7 +320,7 @@ func TestPathRunOnRead(t *testing.T) { @@ -312,7 +320,7 @@ func TestPathRunOnRead(t *testing.T) {
case "rtsp":
reader := gortsplib.Client{}
u, err := rtspurl.Parse("rtsp://127.0.0.1:8554/test")
u, err := rtspurl.Parse("rtsp://127.0.0.1:8554/test?query=value")
require.NoError(t, err)
err = reader.Start(u.Scheme, u.Host)
@ -329,7 +337,7 @@ func TestPathRunOnRead(t *testing.T) { @@ -329,7 +337,7 @@ func TestPathRunOnRead(t *testing.T) {
require.NoError(t, err)
case "rtmp":
u, err := url.Parse("rtmp://127.0.0.1:1935/test")
u, err := url.Parse("rtmp://127.0.0.1:1935/test?query=value")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
@ -356,18 +364,28 @@ func TestPathRunOnRead(t *testing.T) { @@ -356,18 +364,28 @@ func TestPathRunOnRead(t *testing.T) {
case "webrtc":
hc := &http.Client{Transport: &http.Transport{}}
c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep", false)
c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep?query=value", false)
defer c.close()
}
time.Sleep(500 * time.Millisecond)
}()
_, err := os.Stat(onReadFile)
byts, err := os.ReadFile(onReadFile)
require.NoError(t, err)
if ca == "srt" {
require.Equal(t, "test \n", string(byts))
} else {
require.Equal(t, "test query=value\n", string(byts))
}
_, err = os.Stat(onUnreadFile)
byts, err = os.ReadFile(onUnreadFile)
require.NoError(t, err)
if ca == "srt" {
require.Equal(t, "test \n", string(byts))
} else {
require.Equal(t, "test query=value\n", string(byts))
}
})
}
}

32
internal/core/publisher.go

@ -1,7 +1,39 @@ @@ -1,7 +1,39 @@
package core
import (
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
// publisher is an entity that can publish a stream.
type publisher interface {
source
close()
}
func publisherOnDemandHook(path *path, query string) func(string) {
var onDemandCmd *externalcmd.Cmd
if path.conf.RunOnDemand != "" {
env := path.externalCmdEnv()
env["MTX_QUERY"] = query
path.Log(logger.Info, "runOnDemand command started")
onDemandCmd = externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnDemand,
path.conf.RunOnDemandRestart,
env,
func(err error) {
path.Log(logger.Info, "runOnDemand command exited: %v", err)
})
}
return func(reason string) {
if onDemandCmd != nil {
onDemandCmd.Close()
path.Log(logger.Info, "runOnDemand command stopped: %v", reason)
}
}
}

52
internal/core/reader.go

@ -2,6 +2,9 @@ package core @@ -2,6 +2,9 @@ package core
import (
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
@ -14,3 +17,52 @@ type reader interface { @@ -14,3 +17,52 @@ type reader interface {
func readerMediaInfo(r *asyncwriter.Writer, stream *stream.Stream) string {
return mediaInfo(stream.MediasForReader(r))
}
func readerOnReadHook(
externalCmdPool *externalcmd.Pool,
pathConf *conf.Path,
path *path,
reader apiPathSourceOrReader,
query string,
l logger.Writer,
) func() {
var env externalcmd.Environment
var onReadCmd *externalcmd.Cmd
if pathConf.RunOnRead != "" || pathConf.RunOnUnread != "" {
env = path.externalCmdEnv()
desc := reader
env["MTX_QUERY"] = query
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
}
if pathConf.RunOnRead != "" {
l.Log(logger.Info, "runOnRead command started")
onReadCmd = externalcmd.NewCmd(
externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
env,
func(err error) {
l.Log(logger.Info, "runOnRead command exited: %v", err)
})
}
return func() {
if onReadCmd != nil {
onReadCmd.Close()
l.Log(logger.Info, "runOnRead command stopped")
}
if pathConf.RunOnUnread != "" {
l.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
externalCmdPool,
pathConf.RunOnUnread,
false,
env,
nil)
}
}
}

70
internal/core/rtmp_conn.go

@ -201,9 +201,9 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { @@ -201,9 +201,9 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.addReader(pathAddReaderReq{
author: c,
pathName: pathName,
credentials: authCredentials{
author: c,
accessRequest: pathAccessRequest{
name: pathName,
query: rawQuery,
ip: c.ip(),
user: query.Get("user"),
@ -255,43 +255,14 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { @@ -255,43 +255,14 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
pathConf := res.path.safeConf()
if pathConf.RunOnRead != "" {
env := res.path.externalCmdEnv()
desc := c.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
c.Log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
env,
func(err error) {
c.Log(logger.Info, "runOnRead command exited: %v", err)
})
defer func() {
onReadCmd.Close()
c.Log(logger.Info, "runOnRead command stopped")
}()
}
if pathConf.RunOnUnread != "" {
defer func() {
env := res.path.externalCmdEnv()
desc := c.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
c.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnUnread,
false,
res.path.externalCmdEnv(),
nil)
}()
}
onUnreadHook := readerOnReadHook(
c.externalCmdPool,
pathConf,
res.path,
c.apiReaderDescribe(),
rawQuery,
c)
defer onUnreadHook()
var err error
w, err = rtmp.NewWriter(conn, videoFormat, audioFormat)
@ -460,15 +431,16 @@ func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error { @@ -460,15 +431,16 @@ func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.addPublisher(pathAddPublisherReq{
author: c,
pathName: pathName,
credentials: authCredentials{
query: rawQuery,
ip: c.ip(),
user: query.Get("user"),
pass: query.Get("pass"),
proto: authProtocolRTMP,
id: &c.uuid,
author: c,
accessRequest: pathAccessRequest{
name: pathName,
query: rawQuery,
publish: true,
ip: c.ip(),
user: query.Get("user"),
pass: query.Get("pass"),
proto: authProtocolRTMP,
id: &c.uuid,
},
})

5
internal/core/rtsp_conn.go

@ -155,9 +155,8 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, @@ -155,9 +155,8 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
}
res := c.pathManager.describe(pathDescribeReq{
pathName: ctx.Path,
url: ctx.Request.URL,
credentials: authCredentials{
accessRequest: pathAccessRequest{
name: ctx.Path,
query: ctx.Query,
ip: c.ip(),
proto: authProtocolRTSP,

65
internal/core/rtsp_session.go

@ -44,7 +44,7 @@ type rtspSession struct { @@ -44,7 +44,7 @@ type rtspSession struct {
created time.Time
path *path
stream *stream.Stream
onReadCmd *externalcmd.Cmd // read
onUnreadHook func()
mutex sync.Mutex
state gortsplib.ServerSessionState
transport *gortsplib.Transport
@ -96,32 +96,10 @@ func (s *rtspSession) Log(level logger.Level, format string, args ...interface{} @@ -96,32 +96,10 @@ func (s *rtspSession) Log(level logger.Level, format string, args ...interface{}
s.parent.Log(level, "[session %s] "+format, append([]interface{}{id}, args...)...)
}
func (s *rtspSession) onUnread() {
if s.onReadCmd != nil {
s.Log(logger.Info, "runOnRead command stopped")
s.onReadCmd.Close()
}
if s.path.conf.RunOnUnread != "" {
env := s.path.externalCmdEnv()
desc := s.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
s.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
s.externalCmdPool,
s.path.conf.RunOnUnread,
false,
env,
nil)
}
}
// onClose is called by rtspServer.
func (s *rtspSession) onClose(err error) {
if s.session.State() == gortsplib.ServerSessionStatePlay {
s.onUnread()
s.onUnreadHook()
}
switch s.session.State() {
@ -158,10 +136,11 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -158,10 +136,11 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
}
res := s.pathManager.addPublisher(pathAddPublisherReq{
author: s,
pathName: ctx.Path,
credentials: authCredentials{
author: s,
accessRequest: pathAccessRequest{
name: ctx.Path,
query: ctx.Query,
publish: true,
ip: c.ip(),
proto: authProtocolRTSP,
id: &c.uuid,
@ -243,9 +222,9 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -243,9 +222,9 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
}
res := s.pathManager.addReader(pathAddReaderReq{
author: s,
pathName: ctx.Path,
credentials: authCredentials{
author: s,
accessRequest: pathAccessRequest{
name: ctx.Path,
query: ctx.Query,
ip: c.ip(),
proto: authProtocolRTSP,
@ -312,22 +291,14 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons @@ -312,22 +291,14 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
pathConf := s.path.safeConf()
if pathConf.RunOnRead != "" {
env := s.path.externalCmdEnv()
desc := s.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
s.Log(logger.Info, "runOnRead command started")
s.onReadCmd = externalcmd.NewCmd(
s.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
env,
func(err error) {
s.Log(logger.Info, "runOnRead command exited: %v", err)
})
}
s.onUnreadHook = readerOnReadHook(
s.externalCmdPool,
pathConf,
s.path,
s.apiReaderDescribe(),
s.session.SetuppedQuery(),
s,
)
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePlay
@ -386,7 +357,7 @@ func (s *rtspSession) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Res @@ -386,7 +357,7 @@ func (s *rtspSession) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Res
func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
switch s.session.State() {
case gortsplib.ServerSessionStatePlay:
s.onUnread()
s.onUnreadHook()
s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay

43
internal/core/source.go

@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
@ -46,3 +47,45 @@ func mediaInfo(medias []*description.Media) string { @@ -46,3 +47,45 @@ func mediaInfo(medias []*description.Media) string {
}(),
strings.Join(mediasDescription(medias), ", "))
}
func sourceOnReadyHook(path *path) func() {
var env externalcmd.Environment
var onReadyCmd *externalcmd.Cmd
if path.conf.RunOnReady != "" {
env = path.externalCmdEnv()
desc := path.source.apiSourceDescribe()
env["MTX_QUERY"] = path.publisherQuery
env["MTX_SOURCE_TYPE"] = desc.Type
env["MTX_SOURCE_ID"] = desc.ID
}
if path.conf.RunOnReady != "" {
path.Log(logger.Info, "runOnReady command started")
onReadyCmd = externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnReady,
path.conf.RunOnReadyRestart,
env,
func(err error) {
path.Log(logger.Info, "runOnReady command exited: %v", err)
})
}
return func() {
if onReadyCmd != nil {
onReadyCmd.Close()
path.Log(logger.Info, "runOnReady command stopped")
}
if path.conf.RunOnNotReady != "" {
path.Log(logger.Info, "runOnNotReady command launched")
externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnNotReady,
false,
env,
nil)
}
}
}

69
internal/core/srt_conn.go

@ -207,14 +207,15 @@ func (c *srtConn) runInner2(req srtNewConnReq) (bool, error) { @@ -207,14 +207,15 @@ func (c *srtConn) runInner2(req srtNewConnReq) (bool, error) {
func (c *srtConn) runPublish(req srtNewConnReq, pathName string, user string, pass string) (bool, error) {
res := c.pathManager.addPublisher(pathAddPublisherReq{
author: c,
pathName: pathName,
credentials: authCredentials{
ip: c.ip(),
user: user,
pass: pass,
proto: authProtocolSRT,
id: &c.uuid,
author: c,
accessRequest: pathAccessRequest{
name: pathName,
ip: c.ip(),
publish: true,
user: user,
pass: pass,
proto: authProtocolSRT,
id: &c.uuid,
},
})
@ -304,9 +305,9 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { @@ -304,9 +305,9 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass string) (bool, error) {
res := c.pathManager.addReader(pathAddReaderReq{
author: c,
pathName: pathName,
credentials: authCredentials{
author: c,
accessRequest: pathAccessRequest{
name: pathName,
ip: c.ip(),
user: user,
pass: pass,
@ -360,43 +361,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass @@ -360,43 +361,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
pathConf := res.path.safeConf()
if pathConf.RunOnRead != "" {
env := res.path.externalCmdEnv()
desc := c.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
c.Log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
env,
func(err error) {
c.Log(logger.Info, "runOnRead command exited: %v", err)
})
defer func() {
onReadCmd.Close()
c.Log(logger.Info, "runOnRead command stopped")
}()
}
if pathConf.RunOnUnread != "" {
defer func() {
env := res.path.externalCmdEnv()
desc := c.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
c.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnUnread,
false,
env,
nil)
}()
}
onUnreadHook := readerOnReadHook(
c.externalCmdPool,
pathConf,
res.path,
c.apiReaderDescribe(),
"",
c,
)
defer onUnreadHook()
// disable read deadline
sconn.SetReadDeadline(time.Time{})

16
internal/core/webrtc_http_server.go

@ -171,14 +171,14 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { @@ -171,14 +171,14 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
// if request doesn't belong to a session, check authentication here
if !isWHIPorWHEP || ctx.Request.Method == http.MethodOptions {
res := s.pathManager.getConfForPath(pathGetConfForPathReq{
name: dir,
publish: publish,
credentials: authCredentials{
query: ctx.Request.URL.RawQuery,
ip: net.ParseIP(ip),
user: user,
pass: pass,
proto: authProtocolWebRTC,
accessRequest: pathAccessRequest{
name: dir,
query: ctx.Request.URL.RawQuery,
publish: publish,
ip: net.ParseIP(ip),
user: user,
pass: pass,
proto: authProtocolWebRTC,
},
})
if res.err != nil {

71
internal/core/webrtc_session.go

@ -277,15 +277,16 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -277,15 +277,16 @@ func (s *webRTCSession) runPublish() (int, error) {
ip, _, _ := net.SplitHostPort(s.req.remoteAddr)
res := s.pathManager.addPublisher(pathAddPublisherReq{
author: s,
pathName: s.req.pathName,
credentials: authCredentials{
query: s.req.query,
ip: net.ParseIP(ip),
user: s.req.user,
pass: s.req.pass,
proto: authProtocolWebRTC,
id: &s.uuid,
author: s,
accessRequest: pathAccessRequest{
name: s.req.pathName,
query: s.req.query,
publish: true,
ip: net.ParseIP(ip),
user: s.req.user,
pass: s.req.pass,
proto: authProtocolWebRTC,
id: &s.uuid,
},
})
if res.err != nil {
@ -418,9 +419,9 @@ func (s *webRTCSession) runRead() (int, error) { @@ -418,9 +419,9 @@ func (s *webRTCSession) runRead() (int, error) {
ip, _, _ := net.SplitHostPort(s.req.remoteAddr)
res := s.pathManager.addReader(pathAddReaderReq{
author: s,
pathName: s.req.pathName,
credentials: authCredentials{
author: s,
accessRequest: pathAccessRequest{
name: s.req.pathName,
query: s.req.query,
ip: net.ParseIP(ip),
user: s.req.user,
@ -521,43 +522,15 @@ func (s *webRTCSession) runRead() (int, error) { @@ -521,43 +522,15 @@ func (s *webRTCSession) runRead() (int, error) {
pathConf := res.path.safeConf()
if pathConf.RunOnRead != "" {
env := res.path.externalCmdEnv()
desc := s.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
s.Log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
s.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
env,
func(err error) {
s.Log(logger.Info, "runOnRead command exited: %v", err)
})
defer func() {
onReadCmd.Close()
s.Log(logger.Info, "runOnRead command stopped")
}()
}
if pathConf.RunOnUnread != "" {
defer func() {
env := res.path.externalCmdEnv()
desc := s.apiReaderDescribe()
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
s.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
s.externalCmdPool,
pathConf.RunOnUnread,
false,
env,
nil)
}()
}
onUnreadHook := readerOnReadHook(
s.externalCmdPool,
pathConf,
res.path,
s.apiReaderDescribe(),
s.req.query,
s,
)
defer onUnreadHook()
writer.Start()

3
mediamtx.yml

@ -473,6 +473,7 @@ pathDefaults: @@ -473,6 +473,7 @@ pathDefaults:
# This is terminated with SIGINT when the path is not requested anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by first reader)
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
@ -491,6 +492,7 @@ pathDefaults: @@ -491,6 +492,7 @@ pathDefaults:
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by publisher)
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
@ -507,6 +509,7 @@ pathDefaults: @@ -507,6 +509,7 @@ pathDefaults:
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by reader)
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.

Loading…
Cancel
Save