From 3a5bb06e26bb3013ac27ec75cc5179ee9edae855 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Wed, 18 Oct 2023 11:50:26 +0200 Subject: [PATCH] add environment variable MTX_QUERY to some hooks (#2483) (#2522) --- README.md | 3 + go.mod | 2 +- go.sum | 4 +- internal/core/authentication.go | 69 ++++++--------- internal/core/hls_http_server.go | 16 ++-- internal/core/hls_muxer.go | 8 +- internal/core/mpegts.go | 7 +- internal/core/path.go | 125 ++++++++++------------------ internal/core/path_manager.go | 41 ++++----- internal/core/path_test.go | 62 +++++++++----- internal/core/publisher.go | 32 +++++++ internal/core/reader.go | 52 ++++++++++++ internal/core/rtmp_conn.go | 70 +++++----------- internal/core/rtsp_conn.go | 5 +- internal/core/rtsp_session.go | 65 ++++----------- internal/core/source.go | 43 ++++++++++ internal/core/srt_conn.go | 69 +++++---------- internal/core/webrtc_http_server.go | 16 ++-- internal/core/webrtc_session.go | 71 +++++----------- mediamtx.yml | 3 + 20 files changed, 378 insertions(+), 385 deletions(-) diff --git a/README.md b/README.md index 1d32042b..25d7b8e6 100644 --- a/README.md +++ b/README.md @@ -1311,6 +1311,7 @@ paths: # This is terminated with SIGINT when the program closes. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by first reader) # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. @@ -1326,6 +1327,7 @@ pathDefaults: # This is terminated with SIGINT when the stream is not ready anymore. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by publisher) # * MTX_SOURCE_TYPE: source type # * MTX_SOURCE_ID: source ID # * RTSP_PORT: RTSP server port @@ -1351,6 +1353,7 @@ pathDefaults: # This is terminated with SIGINT when a client stops reading. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by reader) # * MTX_READER_TYPE: reader type # * MTX_READER_ID: reader ID # * RTSP_PORT: RTSP server port diff --git a/go.mod b/go.mod index 7139a94e..378b7989 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/alecthomas/kong v0.8.1 github.com/aler9/writerseeker v1.1.0 github.com/bluenviron/gohlslib v1.0.4 - github.com/bluenviron/gortsplib/v4 v4.2.1 + github.com/bluenviron/gortsplib/v4 v4.2.2-0.20231017183154-2f7ed32139f2 github.com/bluenviron/mediacommon v1.5.0 github.com/datarhei/gosrt v0.5.4 github.com/fsnotify/fsnotify v1.6.0 diff --git a/go.sum b/go.sum index 302ec313..fb37d92c 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib v1.0.4 h1:cNpNQSPzomM7fqSaGPbdRxpq5QtI3b0vjT3nWQ5/LrU= github.com/bluenviron/gohlslib v1.0.4/go.mod h1:yMw/Gm+SeC498fHc7DnqKvczTMZGBdJdJUmAM+k+pu0= -github.com/bluenviron/gortsplib/v4 v4.2.1 h1:LugQr3TIKoj6GjOf470teDP8goWiL8PTrX2OaF+L2Vc= -github.com/bluenviron/gortsplib/v4 v4.2.1/go.mod h1:VOoeI2VxRKh5eEg6Y48DGb/oLxU1i+X0Xzv9z8dvsUQ= +github.com/bluenviron/gortsplib/v4 v4.2.2-0.20231017183154-2f7ed32139f2 h1:NkmQiKSdKAMuG+BFsxhSp6tf6laYJR3d+T+frUKBQAI= +github.com/bluenviron/gortsplib/v4 v4.2.2-0.20231017183154-2f7ed32139f2/go.mod h1:VOoeI2VxRKh5eEg6Y48DGb/oLxU1i+X0Xzv9z8dvsUQ= github.com/bluenviron/mediacommon v1.5.0 h1:lS0YKNo22ZOyCsYcLh3jn3TgUALqYw0f7RVwalC09vI= github.com/bluenviron/mediacommon v1.5.0/go.mod h1:Ij/kE1LEucSjryNBVTyPL/gBI0d6/Css3f5PyrM957w= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= diff --git a/internal/core/authentication.go b/internal/core/authentication.go index 6bf8631d..aed391d0 100644 --- a/internal/core/authentication.go +++ b/internal/core/authentication.go @@ -7,14 +7,11 @@ import ( "encoding/json" "fmt" "io" - "net" "net/http" "strings" "github.com/bluenviron/gortsplib/v4/pkg/auth" - "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/headers" - "github.com/bluenviron/gortsplib/v4/pkg/url" "github.com/google/uuid" "github.com/bluenviron/mediamtx/internal/conf" @@ -53,23 +50,9 @@ const ( authProtocolSRT authProtocol = "srt" ) -type authCredentials struct { - query string - ip net.IP - user string - pass string - proto authProtocol - id *uuid.UUID - rtspRequest *base.Request - rtspBaseURL *url.URL - rtspNonce string -} - func doExternalAuthentication( ur string, - path string, - publish bool, - credentials authCredentials, + accessRequest pathAccessRequest, ) error { enc, _ := json.Marshal(struct { IP string `json:"ip"` @@ -81,19 +64,19 @@ func doExternalAuthentication( Action string `json:"action"` Query string `json:"query"` }{ - IP: credentials.ip.String(), - User: credentials.user, - Password: credentials.pass, - Path: path, - Protocol: string(credentials.proto), - ID: credentials.id, + IP: accessRequest.ip.String(), + User: accessRequest.user, + Password: accessRequest.pass, + Path: accessRequest.name, + Protocol: string(accessRequest.proto), + ID: accessRequest.id, Action: func() string { - if publish { + if accessRequest.publish { return "publish" } return "read" }(), - Query: credentials.query, + Query: accessRequest.query, }) res, err := http.Post(ur, "application/json", bytes.NewReader(enc)) if err != nil { @@ -114,26 +97,22 @@ func doExternalAuthentication( func doAuthentication( externalAuthenticationURL string, rtspAuthMethods conf.AuthMethods, - pathName string, pathConf *conf.Path, - publish bool, - credentials authCredentials, + accessRequest pathAccessRequest, ) error { var rtspAuth headers.Authorization - if credentials.rtspRequest != nil { - err := rtspAuth.Unmarshal(credentials.rtspRequest.Header["Authorization"]) + if accessRequest.rtspRequest != nil { + err := rtspAuth.Unmarshal(accessRequest.rtspRequest.Header["Authorization"]) if err == nil && rtspAuth.Method == headers.AuthBasic { - credentials.user = rtspAuth.BasicUser - credentials.pass = rtspAuth.BasicPass + accessRequest.user = rtspAuth.BasicUser + accessRequest.pass = rtspAuth.BasicPass } } if externalAuthenticationURL != "" { err := doExternalAuthentication( externalAuthenticationURL, - pathName, - publish, - credentials, + accessRequest, ) if err != nil { return &errAuthentication{message: fmt.Sprintf("external authentication failed: %s", err)} @@ -144,7 +123,7 @@ func doAuthentication( var pathUser string var pathPass string - if publish { + if accessRequest.publish { pathIPs = pathConf.PublishIPs pathUser = string(pathConf.PublishUser) pathPass = string(pathConf.PublishPass) @@ -155,26 +134,26 @@ func doAuthentication( } if pathIPs != nil { - if !ipEqualOrInRange(credentials.ip, pathIPs) { - return &errAuthentication{message: fmt.Sprintf("IP %s not allowed", credentials.ip)} + if !ipEqualOrInRange(accessRequest.ip, pathIPs) { + return &errAuthentication{message: fmt.Sprintf("IP %s not allowed", accessRequest.ip)} } } if pathUser != "" { - if credentials.rtspRequest != nil && rtspAuth.Method == headers.AuthDigest { + if accessRequest.rtspRequest != nil && rtspAuth.Method == headers.AuthDigest { err := auth.Validate( - credentials.rtspRequest, + accessRequest.rtspRequest, pathUser, pathPass, - credentials.rtspBaseURL, + accessRequest.rtspBaseURL, rtspAuthMethods, "IPCAM", - credentials.rtspNonce) + accessRequest.rtspNonce) if err != nil { return &errAuthentication{message: err.Error()} } - } else if !checkCredential(pathUser, credentials.user) || - !checkCredential(pathPass, credentials.pass) { + } else if !checkCredential(pathUser, accessRequest.user) || + !checkCredential(pathPass, accessRequest.pass) { return &errAuthentication{message: "invalid credentials"} } } diff --git a/internal/core/hls_http_server.go b/internal/core/hls_http_server.go index f5fdc815..caab95fd 100644 --- a/internal/core/hls_http_server.go +++ b/internal/core/hls_http_server.go @@ -163,14 +163,14 @@ func (s *hlsHTTPServer) onRequest(ctx *gin.Context) { user, pass, hasCredentials := ctx.Request.BasicAuth() res := s.pathManager.getConfForPath(pathGetConfForPathReq{ - name: dir, - publish: false, - credentials: authCredentials{ - query: ctx.Request.URL.RawQuery, - ip: net.ParseIP(ctx.ClientIP()), - user: user, - pass: pass, - proto: authProtocolHLS, + accessRequest: pathAccessRequest{ + name: dir, + query: ctx.Request.URL.RawQuery, + publish: false, + ip: net.ParseIP(ctx.ClientIP()), + user: user, + pass: pass, + proto: authProtocolHLS, }, }) if res.err != nil { diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 61b71e46..9ce6ca1e 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -242,9 +242,11 @@ func (m *hlsMuxer) clearQueuedRequests() { func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { res := m.pathManager.addReader(pathAddReaderReq{ - author: m, - pathName: m.pathName, - skipAuth: true, + author: m, + accessRequest: pathAccessRequest{ + name: m.pathName, + skipAuth: true, + }, }) if res.err != nil { return res.err diff --git a/internal/core/mpegts.go b/internal/core/mpegts.go index a5045739..ab9f46cd 100644 --- a/internal/core/mpegts.go +++ b/internal/core/mpegts.go @@ -211,8 +211,11 @@ func mpegtsSetupRead(r *mpegts.Reader, stream **stream.Stream) ([]*description.M return medias, nil } -func mpegtsSetupWrite(stream *stream.Stream, - writer *asyncwriter.Writer, bw *bufio.Writer, sconn srt.Conn, +func mpegtsSetupWrite( + stream *stream.Stream, + writer *asyncwriter.Writer, + bw *bufio.Writer, + sconn srt.Conn, writeTimeout time.Duration, ) error { var w *mpegts.Writer diff --git a/internal/core/path.go b/internal/core/path.go index daee050a..de34f5b0 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -9,8 +9,10 @@ import ( "sync" "time" + "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/url" + "github.com/google/uuid" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/externalcmd" @@ -50,6 +52,23 @@ const ( pathOnDemandStateClosing ) +type pathAccessRequest struct { + name string + query string + publish bool + skipAuth bool + + // only if skipAuth = false + ip net.IP + user string + pass string + proto authProtocol + id *uuid.UUID + rtspRequest *base.Request + rtspBaseURL *url.URL + rtspNonce string +} + type pathSourceStaticSetReadyRes struct { stream *stream.Stream err error @@ -81,10 +100,8 @@ type pathGetConfForPathRes struct { } type pathGetConfForPathReq struct { - name string - publish bool - credentials authCredentials - res chan pathGetConfForPathRes + accessRequest pathAccessRequest + res chan pathGetConfForPathRes } type pathDescribeRes struct { @@ -95,10 +112,8 @@ type pathDescribeRes struct { } type pathDescribeReq struct { - pathName string - url *url.URL - credentials authCredentials - res chan pathDescribeRes + accessRequest pathAccessRequest + res chan pathDescribeRes } type pathAddReaderRes struct { @@ -108,11 +123,9 @@ type pathAddReaderRes struct { } type pathAddReaderReq struct { - author reader - pathName string - skipAuth bool - credentials authCredentials - res chan pathAddReaderRes + author reader + accessRequest pathAccessRequest + res chan pathAddReaderRes } type pathAddPublisherRes struct { @@ -121,11 +134,9 @@ type pathAddPublisherRes struct { } type pathAddPublisherReq struct { - author publisher - pathName string - skipAuth bool - credentials authCredentials - res chan pathAddPublisherRes + author publisher + accessRequest pathAccessRequest + res chan pathAddPublisherRes } type pathStartPublisherRes struct { @@ -183,14 +194,15 @@ type path struct { ctxCancel func() confMutex sync.RWMutex source source + publisherQuery string stream *stream.Stream recordAgent *record.Agent readyTime time.Time + onUnDemandHook func(string) + onNotReadyHook func() readers map[reader]struct{} describeRequestsOnHold []pathDescribeReq readerAddRequestsOnHold []pathAddReaderReq - onDemandCmd *externalcmd.Cmd - onReadyCmd *externalcmd.Cmd onDemandStaticSourceState pathOnDemandState onDemandStaticSourceReadyTimer *time.Timer onDemandStaticSourceCloseTimer *time.Timer @@ -358,9 +370,8 @@ func (pa *path) run() { } } - if pa.onDemandCmd != nil { - pa.onDemandCmd.Close() - pa.Log(logger.Info, "runOnDemand command stopped") + if pa.onUnDemandHook != nil { + pa.onUnDemandHook("path closed") } pa.Log(logger.Debug, "destroyed: %v", err) @@ -578,7 +589,7 @@ func (pa *path) doDescribe(req pathDescribeReq) { if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateInitial { - pa.onDemandPublisherStart() + pa.onDemandPublisherStart(req.accessRequest.query) } pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) return @@ -588,9 +599,9 @@ func (pa *path) doDescribe(req pathDescribeReq) { fallbackURL := func() string { if strings.HasPrefix(pa.conf.Fallback, "/") { ur := url.URL{ - Scheme: req.url.Scheme, - User: req.url.User, - Host: req.url.Host, + Scheme: req.accessRequest.rtspRequest.URL.Scheme, + User: req.accessRequest.rtspRequest.URL.User, + Host: req.accessRequest.rtspRequest.URL.Host, Path: pa.conf.Fallback, } return ur.String() @@ -631,6 +642,7 @@ func (pa *path) doAddPublisher(req pathAddPublisherReq) { } pa.source = req.author + pa.publisherQuery = req.accessRequest.query req.res <- pathAddPublisherRes{path: pa} } @@ -696,7 +708,7 @@ func (pa *path) doAddReader(req pathAddReaderReq) { if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateInitial { - pa.onDemandPublisherStart() + pa.onDemandPublisherStart(req.accessRequest.query) } pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req) return @@ -825,16 +837,8 @@ func (pa *path) onDemandStaticSourceStop(reason string) { pa.source.(*sourceStatic).stop(reason) } -func (pa *path) onDemandPublisherStart() { - pa.Log(logger.Info, "runOnDemand command started") - pa.onDemandCmd = externalcmd.NewCmd( - pa.externalCmdPool, - pa.conf.RunOnDemand, - pa.conf.RunOnDemandRestart, - pa.externalCmdEnv(), - func(err error) { - pa.Log(logger.Info, "runOnDemand command exited: %v", err) - }) +func (pa *path) onDemandPublisherStart(query string) { + pa.onUnDemandHook = publisherOnDemandHook(pa, query) pa.onDemandPublisherReadyTimer.Stop() pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout)) @@ -862,11 +866,8 @@ func (pa *path) onDemandPublisherStop(reason string) { pa.onDemandPublisherState = pathOnDemandStateInitial - if pa.onDemandCmd != nil { - pa.onDemandCmd.Close() - pa.onDemandCmd = nil - pa.Log(logger.Info, "runOnDemand command stopped: %s", reason) - } + pa.onUnDemandHook(reason) + pa.onUnDemandHook = nil } func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { @@ -887,22 +888,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error pa.readyTime = time.Now() - if pa.conf.RunOnReady != "" { - env := pa.externalCmdEnv() - desc := pa.source.apiSourceDescribe() - env["MTX_SOURCE_TYPE"] = desc.Type - env["MTX_SOURCE_ID"] = desc.ID - - pa.Log(logger.Info, "runOnReady command started") - pa.onReadyCmd = externalcmd.NewCmd( - pa.externalCmdPool, - pa.conf.RunOnReady, - pa.conf.RunOnReadyRestart, - env, - func(err error) { - pa.Log(logger.Info, "runOnReady command exited: %v", err) - }) - } + pa.onNotReadyHook = sourceOnReadyHook(pa) pa.parent.pathReady(pa) @@ -917,26 +903,7 @@ func (pa *path) setNotReady() { r.close() } - if pa.onReadyCmd != nil { - pa.onReadyCmd.Close() - pa.onReadyCmd = nil - pa.Log(logger.Info, "runOnReady command stopped") - } - - if pa.conf.RunOnNotReady != "" { - env := pa.externalCmdEnv() - desc := pa.source.apiSourceDescribe() - env["MTX_SOURCE_TYPE"] = desc.Type - env["MTX_SOURCE_ID"] = desc.ID - - pa.Log(logger.Info, "runOnNotReady command launched") - externalcmd.NewCmd( - pa.externalCmdPool, - pa.conf.RunOnNotReady, - false, - env, - nil) - } + pa.onNotReadyHook() if pa.recordAgent != nil { pa.recordAgent.Close() diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index e49d6fba..401c3bde 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -284,14 +284,14 @@ func (pm *pathManager) doPathNotReady(pa *path) { } func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) { - _, pathConf, _, err := getConfForPath(pm.pathConfs, req.name) + _, pathConf, _, err := getConfForPath(pm.pathConfs, req.accessRequest.name) if err != nil { req.res <- pathGetConfForPathRes{err: err} return } err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, - req.name, pathConf, req.publish, req.credentials) + pathConf, req.accessRequest) if err != nil { req.res <- pathGetConfForPathRes{err: err} return @@ -301,35 +301,37 @@ func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) { } func (pm *pathManager) doDescribe(req pathDescribeReq) { - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) + pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.accessRequest.name) if err != nil { req.res <- pathDescribeRes{err: err} return } - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, + pathConf, req.accessRequest) if err != nil { req.res <- pathDescribeRes{err: err} return } // create path if it doesn't exist - if _, ok := pm.paths[req.pathName]; !ok { - pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + if _, ok := pm.paths[req.accessRequest.name]; !ok { + pm.createPath(pathConfName, pathConf, req.accessRequest.name, pathMatches) } - req.res <- pathDescribeRes{path: pm.paths[req.pathName]} + req.res <- pathDescribeRes{path: pm.paths[req.accessRequest.name]} } func (pm *pathManager) doAddReader(req pathAddReaderReq) { - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) + pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.accessRequest.name) if err != nil { req.res <- pathAddReaderRes{err: err} return } - if !req.skipAuth { - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) + if !req.accessRequest.skipAuth { + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, + pathConf, req.accessRequest) if err != nil { req.res <- pathAddReaderRes{err: err} return @@ -337,22 +339,23 @@ func (pm *pathManager) doAddReader(req pathAddReaderReq) { } // create path if it doesn't exist - if _, ok := pm.paths[req.pathName]; !ok { - pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + if _, ok := pm.paths[req.accessRequest.name]; !ok { + pm.createPath(pathConfName, pathConf, req.accessRequest.name, pathMatches) } - req.res <- pathAddReaderRes{path: pm.paths[req.pathName]} + req.res <- pathAddReaderRes{path: pm.paths[req.accessRequest.name]} } func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) { - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) + pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.accessRequest.name) if err != nil { req.res <- pathAddPublisherRes{err: err} return } - if !req.skipAuth { - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) + if !req.accessRequest.skipAuth { + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, + pathConf, req.accessRequest) if err != nil { req.res <- pathAddPublisherRes{err: err} return @@ -360,11 +363,11 @@ func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) { } // create path if it doesn't exist - if _, ok := pm.paths[req.pathName]; !ok { - pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + if _, ok := pm.paths[req.accessRequest.name]; !ok { + pm.createPath(pathConfName, pathConf, req.accessRequest.name, pathMatches) } - req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]} + req.res <- pathAddPublisherRes{path: pm.paths[req.accessRequest.name]} } func (pm *pathManager) doAPIPathsList(req pathAPIPathsListReq) { diff --git a/internal/core/path_test.go b/internal/core/path_test.go index c07214a5..649b6a6e 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -9,6 +9,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "testing" "time" @@ -25,11 +26,7 @@ import ( "github.com/bluenviron/mediamtx/internal/rtmp" ) -func TestPathRunOnDemand(t *testing.T) { - onDemandFile := filepath.Join(os.TempDir(), "ondemand") - - srcFile := filepath.Join(os.TempDir(), "ondemand.go") - err := os.WriteFile(srcFile, []byte(` +var runOnDemandSampleScript = ` package main import ( @@ -42,7 +39,9 @@ import ( ) func main() { - if os.Getenv("G1") != "on" { + if os.Getenv("MTX_PATH") != "ondemand" || + os.Getenv("MTX_QUERY") != "param=value" || + os.Getenv("G1") != "on" { panic("environment not set") } @@ -74,12 +73,19 @@ func main() { signal.Notify(c, syscall.SIGINT) <-c - err = os.WriteFile("`+onDemandFile+`", []byte(""), 0644) + err = os.WriteFile("ON_DEMAND_FILE", []byte(""), 0644) if err != nil { panic(err) } } -`), 0o644) +` + +func TestPathRunOnDemand(t *testing.T) { + onDemandFile := filepath.Join(os.TempDir(), "ondemand") + + srcFile := filepath.Join(os.TempDir(), "ondemand.go") + err := os.WriteFile(srcFile, + []byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemandFile)), 0o644) require.NoError(t, err) execFile := filepath.Join(os.TempDir(), "ondemand_cmd") @@ -115,7 +121,7 @@ func main() { br := bufio.NewReader(conn) if ca == "describe" || ca == "describe and setup" { - u, err := rtspurl.Parse("rtsp://localhost:8554/ondemand") + u, err := rtspurl.Parse("rtsp://localhost:8554/ondemand?param=value") require.NoError(t, err) byts, _ := base.Request{ @@ -138,7 +144,7 @@ func main() { require.NoError(t, err) control, _ = desc.MediaDescriptions[0].Attribute("control") } else { - control = "rtsp://localhost:8554/ondemand/" + control = "rtsp://localhost:8554/ondemand?param=value/" } if ca == "setup" || ca == "describe and setup" { @@ -259,15 +265,15 @@ func TestPathRunOnReady(t *testing.T) { "webrtc: no\n"+ "paths:\n"+ " test:\n"+ - " runOnReady: touch %s\n"+ - " runOnNotReady: touch %s\n", + " runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ + " runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", onReadyFile, onNotReadyFile)) require.Equal(t, true, ok) defer p.Close() c := gortsplib.Client{} err := c.StartRecording( - "rtsp://localhost:8554/test", + "rtsp://localhost:8554/test?query=value", &description.Session{Medias: []*description.Media{testMediaH264}}) require.NoError(t, err) defer c.Close() @@ -275,11 +281,13 @@ func TestPathRunOnReady(t *testing.T) { time.Sleep(500 * time.Millisecond) }() - _, err := os.Stat(onReadyFile) + byts, err := os.ReadFile(onReadyFile) require.NoError(t, err) + require.Equal(t, "test query=value\n", string(byts)) - _, err = os.Stat(onNotReadyFile) + byts, err = os.ReadFile(onNotReadyFile) require.NoError(t, err) + require.Equal(t, "test query=value\n", string(byts)) } func TestPathRunOnRead(t *testing.T) { @@ -295,8 +303,8 @@ func TestPathRunOnRead(t *testing.T) { p, ok := newInstance(fmt.Sprintf( "paths:\n"+ " test:\n"+ - " runOnRead: touch %s\n"+ - " runOnUnread: touch %s\n", + " runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ + " runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", onReadFile, onUnreadFile)) require.Equal(t, true, ok) defer p.Close() @@ -312,7 +320,7 @@ func TestPathRunOnRead(t *testing.T) { case "rtsp": reader := gortsplib.Client{} - u, err := rtspurl.Parse("rtsp://127.0.0.1:8554/test") + u, err := rtspurl.Parse("rtsp://127.0.0.1:8554/test?query=value") require.NoError(t, err) err = reader.Start(u.Scheme, u.Host) @@ -329,7 +337,7 @@ func TestPathRunOnRead(t *testing.T) { require.NoError(t, err) case "rtmp": - u, err := url.Parse("rtmp://127.0.0.1:1935/test") + u, err := url.Parse("rtmp://127.0.0.1:1935/test?query=value") require.NoError(t, err) nconn, err := net.Dial("tcp", u.Host) @@ -356,18 +364,28 @@ func TestPathRunOnRead(t *testing.T) { case "webrtc": hc := &http.Client{Transport: &http.Transport{}} - c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep", false) + c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep?query=value", false) defer c.close() } time.Sleep(500 * time.Millisecond) }() - _, err := os.Stat(onReadFile) + byts, err := os.ReadFile(onReadFile) require.NoError(t, err) + if ca == "srt" { + require.Equal(t, "test \n", string(byts)) + } else { + require.Equal(t, "test query=value\n", string(byts)) + } - _, err = os.Stat(onUnreadFile) + byts, err = os.ReadFile(onUnreadFile) require.NoError(t, err) + if ca == "srt" { + require.Equal(t, "test \n", string(byts)) + } else { + require.Equal(t, "test query=value\n", string(byts)) + } }) } } diff --git a/internal/core/publisher.go b/internal/core/publisher.go index c76f8d0f..0fcd3e3c 100644 --- a/internal/core/publisher.go +++ b/internal/core/publisher.go @@ -1,7 +1,39 @@ package core +import ( + "github.com/bluenviron/mediamtx/internal/externalcmd" + "github.com/bluenviron/mediamtx/internal/logger" +) + // publisher is an entity that can publish a stream. type publisher interface { source close() } + +func publisherOnDemandHook(path *path, query string) func(string) { + var onDemandCmd *externalcmd.Cmd + + if path.conf.RunOnDemand != "" { + env := path.externalCmdEnv() + env["MTX_QUERY"] = query + + path.Log(logger.Info, "runOnDemand command started") + + onDemandCmd = externalcmd.NewCmd( + path.externalCmdPool, + path.conf.RunOnDemand, + path.conf.RunOnDemandRestart, + env, + func(err error) { + path.Log(logger.Info, "runOnDemand command exited: %v", err) + }) + } + + return func(reason string) { + if onDemandCmd != nil { + onDemandCmd.Close() + path.Log(logger.Info, "runOnDemand command stopped: %v", reason) + } + } +} diff --git a/internal/core/reader.go b/internal/core/reader.go index c97a0178..7c6de09b 100644 --- a/internal/core/reader.go +++ b/internal/core/reader.go @@ -2,6 +2,9 @@ package core import ( "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/externalcmd" + "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" ) @@ -14,3 +17,52 @@ type reader interface { func readerMediaInfo(r *asyncwriter.Writer, stream *stream.Stream) string { return mediaInfo(stream.MediasForReader(r)) } + +func readerOnReadHook( + externalCmdPool *externalcmd.Pool, + pathConf *conf.Path, + path *path, + reader apiPathSourceOrReader, + query string, + l logger.Writer, +) func() { + var env externalcmd.Environment + var onReadCmd *externalcmd.Cmd + + if pathConf.RunOnRead != "" || pathConf.RunOnUnread != "" { + env = path.externalCmdEnv() + desc := reader + env["MTX_QUERY"] = query + env["MTX_READER_TYPE"] = desc.Type + env["MTX_READER_ID"] = desc.ID + } + + if pathConf.RunOnRead != "" { + l.Log(logger.Info, "runOnRead command started") + onReadCmd = externalcmd.NewCmd( + externalCmdPool, + pathConf.RunOnRead, + pathConf.RunOnReadRestart, + env, + func(err error) { + l.Log(logger.Info, "runOnRead command exited: %v", err) + }) + } + + return func() { + if onReadCmd != nil { + onReadCmd.Close() + l.Log(logger.Info, "runOnRead command stopped") + } + + if pathConf.RunOnUnread != "" { + l.Log(logger.Info, "runOnUnread command launched") + externalcmd.NewCmd( + externalCmdPool, + pathConf.RunOnUnread, + false, + env, + nil) + } + } +} diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 9de46c03..dbc4b232 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -201,9 +201,9 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { pathName, query, rawQuery := pathNameAndQuery(u) res := c.pathManager.addReader(pathAddReaderReq{ - author: c, - pathName: pathName, - credentials: authCredentials{ + author: c, + accessRequest: pathAccessRequest{ + name: pathName, query: rawQuery, ip: c.ip(), user: query.Get("user"), @@ -255,43 +255,14 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { pathConf := res.path.safeConf() - if pathConf.RunOnRead != "" { - env := res.path.externalCmdEnv() - desc := c.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - c.Log(logger.Info, "runOnRead command started") - onReadCmd := externalcmd.NewCmd( - c.externalCmdPool, - pathConf.RunOnRead, - pathConf.RunOnReadRestart, - env, - func(err error) { - c.Log(logger.Info, "runOnRead command exited: %v", err) - }) - defer func() { - onReadCmd.Close() - c.Log(logger.Info, "runOnRead command stopped") - }() - } - - if pathConf.RunOnUnread != "" { - defer func() { - env := res.path.externalCmdEnv() - desc := c.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - c.Log(logger.Info, "runOnUnread command launched") - externalcmd.NewCmd( - c.externalCmdPool, - pathConf.RunOnUnread, - false, - res.path.externalCmdEnv(), - nil) - }() - } + onUnreadHook := readerOnReadHook( + c.externalCmdPool, + pathConf, + res.path, + c.apiReaderDescribe(), + rawQuery, + c) + defer onUnreadHook() var err error w, err = rtmp.NewWriter(conn, videoFormat, audioFormat) @@ -460,15 +431,16 @@ func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error { pathName, query, rawQuery := pathNameAndQuery(u) res := c.pathManager.addPublisher(pathAddPublisherReq{ - author: c, - pathName: pathName, - credentials: authCredentials{ - query: rawQuery, - ip: c.ip(), - user: query.Get("user"), - pass: query.Get("pass"), - proto: authProtocolRTMP, - id: &c.uuid, + author: c, + accessRequest: pathAccessRequest{ + name: pathName, + query: rawQuery, + publish: true, + ip: c.ip(), + user: query.Get("user"), + pass: query.Get("pass"), + proto: authProtocolRTMP, + id: &c.uuid, }, }) diff --git a/internal/core/rtsp_conn.go b/internal/core/rtsp_conn.go index 45e0c44d..6bae8416 100644 --- a/internal/core/rtsp_conn.go +++ b/internal/core/rtsp_conn.go @@ -155,9 +155,8 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, } res := c.pathManager.describe(pathDescribeReq{ - pathName: ctx.Path, - url: ctx.Request.URL, - credentials: authCredentials{ + accessRequest: pathAccessRequest{ + name: ctx.Path, query: ctx.Query, ip: c.ip(), proto: authProtocolRTSP, diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 838bb704..ec4221af 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -44,7 +44,7 @@ type rtspSession struct { created time.Time path *path stream *stream.Stream - onReadCmd *externalcmd.Cmd // read + onUnreadHook func() mutex sync.Mutex state gortsplib.ServerSessionState transport *gortsplib.Transport @@ -96,32 +96,10 @@ func (s *rtspSession) Log(level logger.Level, format string, args ...interface{} s.parent.Log(level, "[session %s] "+format, append([]interface{}{id}, args...)...) } -func (s *rtspSession) onUnread() { - if s.onReadCmd != nil { - s.Log(logger.Info, "runOnRead command stopped") - s.onReadCmd.Close() - } - - if s.path.conf.RunOnUnread != "" { - env := s.path.externalCmdEnv() - desc := s.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - s.Log(logger.Info, "runOnUnread command launched") - externalcmd.NewCmd( - s.externalCmdPool, - s.path.conf.RunOnUnread, - false, - env, - nil) - } -} - // onClose is called by rtspServer. func (s *rtspSession) onClose(err error) { if s.session.State() == gortsplib.ServerSessionStatePlay { - s.onUnread() + s.onUnreadHook() } switch s.session.State() { @@ -158,10 +136,11 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno } res := s.pathManager.addPublisher(pathAddPublisherReq{ - author: s, - pathName: ctx.Path, - credentials: authCredentials{ + author: s, + accessRequest: pathAccessRequest{ + name: ctx.Path, query: ctx.Query, + publish: true, ip: c.ip(), proto: authProtocolRTSP, id: &c.uuid, @@ -243,9 +222,9 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt } res := s.pathManager.addReader(pathAddReaderReq{ - author: s, - pathName: ctx.Path, - credentials: authCredentials{ + author: s, + accessRequest: pathAccessRequest{ + name: ctx.Path, query: ctx.Query, ip: c.ip(), proto: authProtocolRTSP, @@ -312,22 +291,14 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons pathConf := s.path.safeConf() - if pathConf.RunOnRead != "" { - env := s.path.externalCmdEnv() - desc := s.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - s.Log(logger.Info, "runOnRead command started") - s.onReadCmd = externalcmd.NewCmd( - s.externalCmdPool, - pathConf.RunOnRead, - pathConf.RunOnReadRestart, - env, - func(err error) { - s.Log(logger.Info, "runOnRead command exited: %v", err) - }) - } + s.onUnreadHook = readerOnReadHook( + s.externalCmdPool, + pathConf, + s.path, + s.apiReaderDescribe(), + s.session.SetuppedQuery(), + s, + ) s.mutex.Lock() s.state = gortsplib.ServerSessionStatePlay @@ -386,7 +357,7 @@ func (s *rtspSession) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Res func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) { switch s.session.State() { case gortsplib.ServerSessionStatePlay: - s.onUnread() + s.onUnreadHook() s.mutex.Lock() s.state = gortsplib.ServerSessionStatePrePlay diff --git a/internal/core/source.go b/internal/core/source.go index f27ada71..cd181fc3 100644 --- a/internal/core/source.go +++ b/internal/core/source.go @@ -6,6 +6,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -46,3 +47,45 @@ func mediaInfo(medias []*description.Media) string { }(), strings.Join(mediasDescription(medias), ", ")) } + +func sourceOnReadyHook(path *path) func() { + var env externalcmd.Environment + var onReadyCmd *externalcmd.Cmd + + if path.conf.RunOnReady != "" { + env = path.externalCmdEnv() + desc := path.source.apiSourceDescribe() + env["MTX_QUERY"] = path.publisherQuery + env["MTX_SOURCE_TYPE"] = desc.Type + env["MTX_SOURCE_ID"] = desc.ID + } + + if path.conf.RunOnReady != "" { + path.Log(logger.Info, "runOnReady command started") + onReadyCmd = externalcmd.NewCmd( + path.externalCmdPool, + path.conf.RunOnReady, + path.conf.RunOnReadyRestart, + env, + func(err error) { + path.Log(logger.Info, "runOnReady command exited: %v", err) + }) + } + + return func() { + if onReadyCmd != nil { + onReadyCmd.Close() + path.Log(logger.Info, "runOnReady command stopped") + } + + if path.conf.RunOnNotReady != "" { + path.Log(logger.Info, "runOnNotReady command launched") + externalcmd.NewCmd( + path.externalCmdPool, + path.conf.RunOnNotReady, + false, + env, + nil) + } + } +} diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index aa10a978..19a92ab5 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -207,14 +207,15 @@ func (c *srtConn) runInner2(req srtNewConnReq) (bool, error) { func (c *srtConn) runPublish(req srtNewConnReq, pathName string, user string, pass string) (bool, error) { res := c.pathManager.addPublisher(pathAddPublisherReq{ - author: c, - pathName: pathName, - credentials: authCredentials{ - ip: c.ip(), - user: user, - pass: pass, - proto: authProtocolSRT, - id: &c.uuid, + author: c, + accessRequest: pathAccessRequest{ + name: pathName, + ip: c.ip(), + publish: true, + user: user, + pass: pass, + proto: authProtocolSRT, + id: &c.uuid, }, }) @@ -304,9 +305,9 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass string) (bool, error) { res := c.pathManager.addReader(pathAddReaderReq{ - author: c, - pathName: pathName, - credentials: authCredentials{ + author: c, + accessRequest: pathAccessRequest{ + name: pathName, ip: c.ip(), user: user, pass: pass, @@ -360,43 +361,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass pathConf := res.path.safeConf() - if pathConf.RunOnRead != "" { - env := res.path.externalCmdEnv() - desc := c.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - c.Log(logger.Info, "runOnRead command started") - onReadCmd := externalcmd.NewCmd( - c.externalCmdPool, - pathConf.RunOnRead, - pathConf.RunOnReadRestart, - env, - func(err error) { - c.Log(logger.Info, "runOnRead command exited: %v", err) - }) - defer func() { - onReadCmd.Close() - c.Log(logger.Info, "runOnRead command stopped") - }() - } - - if pathConf.RunOnUnread != "" { - defer func() { - env := res.path.externalCmdEnv() - desc := c.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - c.Log(logger.Info, "runOnUnread command launched") - externalcmd.NewCmd( - c.externalCmdPool, - pathConf.RunOnUnread, - false, - env, - nil) - }() - } + onUnreadHook := readerOnReadHook( + c.externalCmdPool, + pathConf, + res.path, + c.apiReaderDescribe(), + "", + c, + ) + defer onUnreadHook() // disable read deadline sconn.SetReadDeadline(time.Time{}) diff --git a/internal/core/webrtc_http_server.go b/internal/core/webrtc_http_server.go index 31bdd36f..d874de13 100644 --- a/internal/core/webrtc_http_server.go +++ b/internal/core/webrtc_http_server.go @@ -171,14 +171,14 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { // if request doesn't belong to a session, check authentication here if !isWHIPorWHEP || ctx.Request.Method == http.MethodOptions { res := s.pathManager.getConfForPath(pathGetConfForPathReq{ - name: dir, - publish: publish, - credentials: authCredentials{ - query: ctx.Request.URL.RawQuery, - ip: net.ParseIP(ip), - user: user, - pass: pass, - proto: authProtocolWebRTC, + accessRequest: pathAccessRequest{ + name: dir, + query: ctx.Request.URL.RawQuery, + publish: publish, + ip: net.ParseIP(ip), + user: user, + pass: pass, + proto: authProtocolWebRTC, }, }) if res.err != nil { diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index a194db20..49f97b76 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -277,15 +277,16 @@ func (s *webRTCSession) runPublish() (int, error) { ip, _, _ := net.SplitHostPort(s.req.remoteAddr) res := s.pathManager.addPublisher(pathAddPublisherReq{ - author: s, - pathName: s.req.pathName, - credentials: authCredentials{ - query: s.req.query, - ip: net.ParseIP(ip), - user: s.req.user, - pass: s.req.pass, - proto: authProtocolWebRTC, - id: &s.uuid, + author: s, + accessRequest: pathAccessRequest{ + name: s.req.pathName, + query: s.req.query, + publish: true, + ip: net.ParseIP(ip), + user: s.req.user, + pass: s.req.pass, + proto: authProtocolWebRTC, + id: &s.uuid, }, }) if res.err != nil { @@ -418,9 +419,9 @@ func (s *webRTCSession) runRead() (int, error) { ip, _, _ := net.SplitHostPort(s.req.remoteAddr) res := s.pathManager.addReader(pathAddReaderReq{ - author: s, - pathName: s.req.pathName, - credentials: authCredentials{ + author: s, + accessRequest: pathAccessRequest{ + name: s.req.pathName, query: s.req.query, ip: net.ParseIP(ip), user: s.req.user, @@ -521,43 +522,15 @@ func (s *webRTCSession) runRead() (int, error) { pathConf := res.path.safeConf() - if pathConf.RunOnRead != "" { - env := res.path.externalCmdEnv() - desc := s.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - s.Log(logger.Info, "runOnRead command started") - onReadCmd := externalcmd.NewCmd( - s.externalCmdPool, - pathConf.RunOnRead, - pathConf.RunOnReadRestart, - env, - func(err error) { - s.Log(logger.Info, "runOnRead command exited: %v", err) - }) - defer func() { - onReadCmd.Close() - s.Log(logger.Info, "runOnRead command stopped") - }() - } - - if pathConf.RunOnUnread != "" { - defer func() { - env := res.path.externalCmdEnv() - desc := s.apiReaderDescribe() - env["MTX_READER_TYPE"] = desc.Type - env["MTX_READER_ID"] = desc.ID - - s.Log(logger.Info, "runOnUnread command launched") - externalcmd.NewCmd( - s.externalCmdPool, - pathConf.RunOnUnread, - false, - env, - nil) - }() - } + onUnreadHook := readerOnReadHook( + s.externalCmdPool, + pathConf, + res.path, + s.apiReaderDescribe(), + s.req.query, + s, + ) + defer onUnreadHook() writer.Start() diff --git a/mediamtx.yml b/mediamtx.yml index 21514f0a..1aa0ada4 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -473,6 +473,7 @@ pathDefaults: # This is terminated with SIGINT when the path is not requested anymore. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by first reader) # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. @@ -491,6 +492,7 @@ pathDefaults: # This is terminated with SIGINT when the stream is not ready anymore. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by publisher) # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. @@ -507,6 +509,7 @@ pathDefaults: # This is terminated with SIGINT when a client stops reading. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_QUERY: query parameters (passed by reader) # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression.