Browse Source

move hooks into dedicated package (#2746)

pull/2751/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
ce45498769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      internal/core/conn.go
  2. 208
      internal/core/hooks.go
  3. 25
      internal/core/path.go
  4. 94
      internal/core/rtmp_conn.go
  5. 35
      internal/core/rtsp_conn.go
  6. 19
      internal/core/rtsp_session.go
  7. 101
      internal/core/srt_conn.go
  8. 19
      internal/core/webrtc_session.go
  9. 2
      internal/hooks/hooks.go
  10. 65
      internal/hooks/on_connect.go
  11. 57
      internal/hooks/on_demand.go
  12. 39
      internal/hooks/on_init.go
  13. 61
      internal/hooks/on_read.go
  14. 60
      internal/hooks/on_ready.go

44
internal/core/conn.go

@ -1,44 +0,0 @@
package core
import (
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
type conn struct {
rtspAddress string
runOnConnect string
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
logger logger.Writer
onDisconnectHook func()
}
func newConn(
rtspAddress string,
runOnConnect string,
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
logger logger.Writer,
) *conn {
return &conn{
rtspAddress: rtspAddress,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
logger: logger,
}
}
func (c *conn) open(desc defs.APIPathSourceOrReader) {
c.onDisconnectHook = onConnectHook(c, desc)
}
func (c *conn) close() {
c.onDisconnectHook()
}

208
internal/core/hooks.go

@ -1,208 +0,0 @@
package core
import (
"net"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
func onInitHook(path *path) func() {
var onInitCmd *externalcmd.Cmd
if path.conf.RunOnInit != "" {
path.Log(logger.Info, "runOnInit command started")
onInitCmd = externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnInit,
path.conf.RunOnInitRestart,
path.externalCmdEnv(),
func(err error) {
path.Log(logger.Info, "runOnInit command exited: %v", err)
})
}
return func() {
if onInitCmd != nil {
onInitCmd.Close()
path.Log(logger.Info, "runOnInit command stopped")
}
}
}
func onConnectHook(c *conn, desc defs.APIPathSourceOrReader) func() {
var env externalcmd.Environment
var onConnectCmd *externalcmd.Cmd
if c.runOnConnect != "" || c.runOnDisconnect != "" {
_, port, _ := net.SplitHostPort(c.rtspAddress)
env = externalcmd.Environment{
"RTSP_PORT": port,
"MTX_CONN_TYPE": desc.Type,
"MTX_CONN_ID": desc.ID,
}
}
if c.runOnConnect != "" {
c.logger.Log(logger.Info, "runOnConnect command started")
onConnectCmd = externalcmd.NewCmd(
c.externalCmdPool,
c.runOnConnect,
c.runOnConnectRestart,
env,
func(err error) {
c.logger.Log(logger.Info, "runOnConnect command exited: %v", err)
})
}
return func() {
if onConnectCmd != nil {
onConnectCmd.Close()
c.logger.Log(logger.Info, "runOnConnect command stopped")
}
if c.runOnDisconnect != "" {
c.logger.Log(logger.Info, "runOnDisconnect command launched")
externalcmd.NewCmd(
c.externalCmdPool,
c.runOnDisconnect,
false,
env,
nil)
}
}
}
func onDemandHook(path *path, query string) func(string) {
var env externalcmd.Environment
var onDemandCmd *externalcmd.Cmd
if path.conf.RunOnDemand != "" || path.conf.RunOnUnDemand != "" {
env = path.externalCmdEnv()
env["MTX_QUERY"] = query
}
if path.conf.RunOnDemand != "" {
path.Log(logger.Info, "runOnDemand command started")
onDemandCmd = externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnDemand,
path.conf.RunOnDemandRestart,
env,
func(err error) {
path.Log(logger.Info, "runOnDemand command exited: %v", err)
})
}
return func(reason string) {
if onDemandCmd != nil {
onDemandCmd.Close()
path.Log(logger.Info, "runOnDemand command stopped: %v", reason)
}
if path.conf.RunOnUnDemand != "" {
path.Log(logger.Info, "runOnUnDemand command launched")
externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnUnDemand,
false,
env,
nil)
}
}
}
func onReadyHook(path *path) func() {
var env externalcmd.Environment
var onReadyCmd *externalcmd.Cmd
if path.conf.RunOnReady != "" || path.conf.RunOnNotReady != "" {
env = path.externalCmdEnv()
desc := path.source.APISourceDescribe()
env["MTX_QUERY"] = path.publisherQuery
env["MTX_SOURCE_TYPE"] = desc.Type
env["MTX_SOURCE_ID"] = desc.ID
}
if path.conf.RunOnReady != "" {
path.Log(logger.Info, "runOnReady command started")
onReadyCmd = externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnReady,
path.conf.RunOnReadyRestart,
env,
func(err error) {
path.Log(logger.Info, "runOnReady command exited: %v", err)
})
}
return func() {
if onReadyCmd != nil {
onReadyCmd.Close()
path.Log(logger.Info, "runOnReady command stopped")
}
if path.conf.RunOnNotReady != "" {
path.Log(logger.Info, "runOnNotReady command launched")
externalcmd.NewCmd(
path.externalCmdPool,
path.conf.RunOnNotReady,
false,
env,
nil)
}
}
}
func onReadHook(
externalCmdPool *externalcmd.Pool,
pathConf *conf.Path,
path *path,
reader defs.APIPathSourceOrReader,
query string,
l logger.Writer,
) func() {
var env externalcmd.Environment
var onReadCmd *externalcmd.Cmd
if pathConf.RunOnRead != "" || pathConf.RunOnUnread != "" {
env = path.externalCmdEnv()
desc := reader
env["MTX_QUERY"] = query
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
}
if pathConf.RunOnRead != "" {
l.Log(logger.Info, "runOnRead command started")
onReadCmd = externalcmd.NewCmd(
externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
env,
func(err error) {
l.Log(logger.Info, "runOnRead command exited: %v", err)
})
}
return func() {
if onReadCmd != nil {
onReadCmd.Close()
l.Log(logger.Info, "runOnRead command stopped")
}
if pathConf.RunOnUnread != "" {
l.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
externalCmdPool,
pathConf.RunOnUnread,
false,
env,
nil)
}
}
}

25
internal/core/path.go

@ -16,6 +16,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
@ -303,7 +304,12 @@ func (pa *path) run() {
} }
} }
onUnInitHook := onInitHook(pa) onUnInitHook := hooks.OnInit(hooks.OnInitParams{
Logger: pa,
ExternalCmdPool: pa.externalCmdPool,
Conf: pa.conf,
ExternalCmdEnv: pa.externalCmdEnv(),
})
err := pa.runInner() err := pa.runInner()
@ -789,7 +795,13 @@ func (pa *path) onDemandStaticSourceStop(reason string) {
} }
func (pa *path) onDemandPublisherStart(query string) { func (pa *path) onDemandPublisherStart(query string) {
pa.onUnDemandHook = onDemandHook(pa, query) pa.onUnDemandHook = hooks.OnDemand(hooks.OnDemandParams{
Logger: pa,
ExternalCmdPool: pa.externalCmdPool,
Conf: pa.conf,
ExternalCmdEnv: pa.externalCmdEnv(),
Query: query,
})
pa.onDemandPublisherReadyTimer.Stop() pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout)) pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
@ -834,7 +846,14 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
pa.readyTime = time.Now() pa.readyTime = time.Now()
pa.onNotReadyHook = onReadyHook(pa) pa.onNotReadyHook = hooks.OnReady(hooks.OnReadyParams{
Logger: pa,
ExternalCmdPool: pa.externalCmdPool,
Conf: pa.conf,
ExternalCmdEnv: pa.externalCmdEnv(),
Desc: pa.source.APISourceDescribe(),
Query: pa.publisherQuery,
})
pa.parent.pathReady(pa) pa.parent.pathReady(pa)

94
internal/core/rtmp_conn.go

@ -21,6 +21,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp" "github.com/bluenviron/mediamtx/internal/protocols/rtmp"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
@ -57,17 +58,19 @@ type rtmpConnParent interface {
} }
type rtmpConn struct { type rtmpConn struct {
*conn isTLS bool
rtspAddress string
isTLS bool readTimeout conf.StringDuration
readTimeout conf.StringDuration writeTimeout conf.StringDuration
writeTimeout conf.StringDuration writeQueueSize int
writeQueueSize int runOnConnect string
wg *sync.WaitGroup runOnConnectRestart bool
nconn net.Conn runOnDisconnect string
externalCmdPool *externalcmd.Pool wg *sync.WaitGroup
pathManager rtmpConnPathManager nconn net.Conn
parent rtmpConnParent externalCmdPool *externalcmd.Pool
pathManager rtmpConnPathManager
parent rtmpConnParent
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
@ -98,30 +101,25 @@ func newRTMPConn(
ctx, ctxCancel := context.WithCancel(parentCtx) ctx, ctxCancel := context.WithCancel(parentCtx)
c := &rtmpConn{ c := &rtmpConn{
isTLS: isTLS, isTLS: isTLS,
readTimeout: readTimeout, rtspAddress: rtspAddress,
writeTimeout: writeTimeout, readTimeout: readTimeout,
writeQueueSize: writeQueueSize, writeTimeout: writeTimeout,
wg: wg, writeQueueSize: writeQueueSize,
nconn: nconn, runOnConnect: runOnConnect,
externalCmdPool: externalCmdPool, runOnConnectRestart: runOnConnectRestart,
pathManager: pathManager, runOnDisconnect: runOnDisconnect,
parent: parent, wg: wg,
ctx: ctx, nconn: nconn,
ctxCancel: ctxCancel, externalCmdPool: externalCmdPool,
uuid: uuid.New(), pathManager: pathManager,
created: time.Now(), parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
uuid: uuid.New(),
created: time.Now(),
} }
c.conn = newConn(
rtspAddress,
runOnConnect,
runOnConnectRestart,
runOnDisconnect,
externalCmdPool,
c,
)
c.Log(logger.Info, "opened") c.Log(logger.Info, "opened")
c.wg.Add(1) c.wg.Add(1)
@ -149,9 +147,16 @@ func (c *rtmpConn) ip() net.IP {
func (c *rtmpConn) run() { //nolint:dupl func (c *rtmpConn) run() { //nolint:dupl
defer c.wg.Done() defer c.wg.Done()
desc := c.apiReaderDescribe() onDisconnectHook := hooks.OnConnect(hooks.OnConnectParams{
c.conn.open(desc) Logger: c,
defer c.conn.close() ExternalCmdPool: c.externalCmdPool,
RunOnConnect: c.runOnConnect,
RunOnConnectRestart: c.runOnConnectRestart,
RunOnDisconnect: c.runOnDisconnect,
RTSPAddress: c.rtspAddress,
Desc: c.apiReaderDescribe(),
})
defer onDisconnectHook()
err := c.runInner() err := c.runInner()
@ -254,15 +259,14 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
c.Log(logger.Info, "is reading from path '%s', %s", c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, readerMediaInfo(writer, res.stream)) res.path.name, readerMediaInfo(writer, res.stream))
pathConf := res.path.safeConf() onUnreadHook := hooks.OnRead(hooks.OnReadParams{
Logger: c,
onUnreadHook := onReadHook( ExternalCmdPool: c.externalCmdPool,
c.externalCmdPool, Conf: res.path.safeConf(),
pathConf, ExternalCmdEnv: res.path.externalCmdEnv(),
res.path, Reader: c.APISourceDescribe(),
c.apiReaderDescribe(), Query: rawQuery,
rawQuery, })
c)
defer onUnreadHook() defer onUnreadHook()
var err error var err error

35
internal/core/rtsp_conn.go

@ -14,6 +14,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -28,8 +29,6 @@ type rtspConnParent interface {
} }
type rtspConn struct { type rtspConn struct {
*conn
isTLS bool isTLS bool
rtspAddress string rtspAddress string
authMethods []headers.AuthMethod authMethods []headers.AuthMethod
@ -38,10 +37,11 @@ type rtspConn struct {
rconn *gortsplib.ServerConn rconn *gortsplib.ServerConn
parent rtspConnParent parent rtspConnParent
uuid uuid.UUID uuid uuid.UUID
created time.Time created time.Time
authNonce string onDisconnectHook func()
authFailures int authNonce string
authFailures int
} }
func newRTSPConn( func newRTSPConn(
@ -69,18 +69,9 @@ func newRTSPConn(
created: time.Now(), created: time.Now(),
} }
c.conn = newConn(
rtspAddress,
runOnConnect,
runOnConnectRestart,
runOnDisconnect,
externalCmdPool,
c,
)
c.Log(logger.Info, "opened") c.Log(logger.Info, "opened")
c.conn.open(defs.APIPathSourceOrReader{ desc := defs.APIPathSourceOrReader{
Type: func() string { Type: func() string {
if isTLS { if isTLS {
return "rtspsConn" return "rtspsConn"
@ -88,6 +79,16 @@ func newRTSPConn(
return "rtspConn" return "rtspConn"
}(), }(),
ID: c.uuid.String(), ID: c.uuid.String(),
}
c.onDisconnectHook = hooks.OnConnect(hooks.OnConnectParams{
Logger: c,
ExternalCmdPool: externalCmdPool,
RunOnConnect: runOnConnect,
RunOnConnectRestart: runOnConnectRestart,
RunOnDisconnect: runOnDisconnect,
RTSPAddress: rtspAddress,
Desc: desc,
}) })
return c return c
@ -114,7 +115,7 @@ func (c *rtspConn) ip() net.IP {
func (c *rtspConn) onClose(err error) { func (c *rtspConn) onClose(err error) {
c.Log(logger.Info, "closed: %v", err) c.Log(logger.Info, "closed: %v", err)
c.conn.close() c.onDisconnectHook()
} }
// onRequest is called by rtspServer. // onRequest is called by rtspServer.

19
internal/core/rtsp_session.go

@ -16,6 +16,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
) )
@ -289,16 +290,14 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
s.session.SetuppedTransport(), s.session.SetuppedTransport(),
mediaInfo(s.session.SetuppedMedias())) mediaInfo(s.session.SetuppedMedias()))
pathConf := s.path.safeConf() s.onUnreadHook = hooks.OnRead(hooks.OnReadParams{
Logger: s,
s.onUnreadHook = onReadHook( ExternalCmdPool: s.externalCmdPool,
s.externalCmdPool, Conf: s.path.safeConf(),
pathConf, ExternalCmdEnv: s.path.externalCmdEnv(),
s.path, Reader: s.apiReaderDescribe(),
s.apiReaderDescribe(), Query: s.session.SetuppedQuery(),
s.session.SetuppedQuery(), })
s,
)
s.mutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePlay s.state = gortsplib.ServerSessionStatePlay

101
internal/core/srt_conn.go

@ -19,6 +19,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/mpegts" "github.com/bluenviron/mediamtx/internal/protocols/mpegts"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
@ -59,18 +60,19 @@ type srtConnParent interface {
} }
type srtConn struct { type srtConn struct {
*conn rtspAddress string
readTimeout conf.StringDuration
rtspAddress string writeTimeout conf.StringDuration
readTimeout conf.StringDuration writeQueueSize int
writeTimeout conf.StringDuration udpMaxPayloadSize int
writeQueueSize int connReq srt.ConnRequest
udpMaxPayloadSize int runOnConnect string
connReq srt.ConnRequest runOnConnectRestart bool
wg *sync.WaitGroup runOnDisconnect string
externalCmdPool *externalcmd.Pool wg *sync.WaitGroup
pathManager srtConnPathManager externalCmdPool *externalcmd.Pool
parent srtConnParent pathManager srtConnPathManager
parent srtConnParent
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
@ -104,33 +106,27 @@ func newSRTConn(
ctx, ctxCancel := context.WithCancel(parentCtx) ctx, ctxCancel := context.WithCancel(parentCtx)
c := &srtConn{ c := &srtConn{
rtspAddress: rtspAddress, rtspAddress: rtspAddress,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
writeQueueSize: writeQueueSize, writeQueueSize: writeQueueSize,
udpMaxPayloadSize: udpMaxPayloadSize, udpMaxPayloadSize: udpMaxPayloadSize,
connReq: connReq, connReq: connReq,
wg: wg, runOnConnect: runOnConnect,
externalCmdPool: externalCmdPool, runOnConnectRestart: runOnConnectRestart,
pathManager: pathManager, runOnDisconnect: runOnDisconnect,
parent: parent, wg: wg,
ctx: ctx, externalCmdPool: externalCmdPool,
ctxCancel: ctxCancel, pathManager: pathManager,
created: time.Now(), parent: parent,
uuid: uuid.New(), ctx: ctx,
chNew: make(chan srtNewConnReq), ctxCancel: ctxCancel,
chSetConn: make(chan srt.Conn), created: time.Now(),
uuid: uuid.New(),
chNew: make(chan srtNewConnReq),
chSetConn: make(chan srt.Conn),
} }
c.conn = newConn(
rtspAddress,
runOnConnect,
runOnConnectRestart,
runOnDisconnect,
externalCmdPool,
c,
)
c.Log(logger.Info, "opened") c.Log(logger.Info, "opened")
c.wg.Add(1) c.wg.Add(1)
@ -154,9 +150,16 @@ func (c *srtConn) ip() net.IP {
func (c *srtConn) run() { //nolint:dupl func (c *srtConn) run() { //nolint:dupl
defer c.wg.Done() defer c.wg.Done()
desc := c.apiReaderDescribe() onDisconnectHook := hooks.OnConnect(hooks.OnConnectParams{
c.conn.open(desc) Logger: c,
defer c.conn.close() ExternalCmdPool: c.externalCmdPool,
RunOnConnect: c.runOnConnect,
RunOnConnectRestart: c.runOnConnectRestart,
RunOnDisconnect: c.runOnDisconnect,
RTSPAddress: c.rtspAddress,
Desc: c.apiReaderDescribe(),
})
defer onDisconnectHook()
err := c.runInner() err := c.runInner()
@ -373,16 +376,14 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
c.Log(logger.Info, "is reading from path '%s', %s", c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, readerMediaInfo(writer, res.stream)) res.path.name, readerMediaInfo(writer, res.stream))
pathConf := res.path.safeConf() onUnreadHook := hooks.OnRead(hooks.OnReadParams{
Logger: c,
onUnreadHook := onReadHook( ExternalCmdPool: c.externalCmdPool,
c.externalCmdPool, Conf: res.path.safeConf(),
pathConf, ExternalCmdEnv: res.path.externalCmdEnv(),
res.path, Reader: c.apiReaderDescribe(),
c.apiReaderDescribe(), Query: query,
query, })
c,
)
defer onUnreadHook() defer onUnreadHook()
// disable read deadline // disable read deadline

19
internal/core/webrtc_session.go

@ -24,6 +24,7 @@ import (
"github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc" "github.com/bluenviron/mediamtx/internal/protocols/webrtc"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
@ -610,16 +611,14 @@ func (s *webRTCSession) runRead() (int, error) {
s.Log(logger.Info, "is reading from path '%s', %s", s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, readerMediaInfo(writer, res.stream)) res.path.name, readerMediaInfo(writer, res.stream))
pathConf := res.path.safeConf() onUnreadHook := hooks.OnRead(hooks.OnReadParams{
Logger: s,
onUnreadHook := onReadHook( ExternalCmdPool: s.externalCmdPool,
s.externalCmdPool, Conf: res.path.safeConf(),
pathConf, ExternalCmdEnv: res.path.externalCmdEnv(),
res.path, Reader: s.apiReaderDescribe(),
s.apiReaderDescribe(), Query: s.req.query,
s.req.query, })
s,
)
defer onUnreadHook() defer onUnreadHook()
writer.Start() writer.Start()

2
internal/hooks/hooks.go

@ -0,0 +1,2 @@
// Package hooks contains hook implementations.
package hooks

65
internal/hooks/on_connect.go

@ -0,0 +1,65 @@
package hooks
import (
"net"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
// OnConnectParams are the parameters of OnConnect.
type OnConnectParams struct {
Logger logger.Writer
ExternalCmdPool *externalcmd.Pool
RunOnConnect string
RunOnConnectRestart bool
RunOnDisconnect string
RTSPAddress string
Desc defs.APIPathSourceOrReader
}
// OnConnect is the OnConnect hook.
func OnConnect(params OnConnectParams) func() {
var env externalcmd.Environment
var onConnectCmd *externalcmd.Cmd
if params.RunOnConnect != "" || params.RunOnDisconnect != "" {
_, port, _ := net.SplitHostPort(params.RTSPAddress)
env = externalcmd.Environment{
"RTSP_PORT": port,
"MTX_CONN_TYPE": params.Desc.Type,
"MTX_CONN_ID": params.Desc.ID,
}
}
if params.RunOnConnect != "" {
params.Logger.Log(logger.Info, "runOnConnect command started")
onConnectCmd = externalcmd.NewCmd(
params.ExternalCmdPool,
params.RunOnConnect,
params.RunOnConnectRestart,
env,
func(err error) {
params.Logger.Log(logger.Info, "runOnConnect command exited: %v", err)
})
}
return func() {
if onConnectCmd != nil {
onConnectCmd.Close()
params.Logger.Log(logger.Info, "runOnConnect command stopped")
}
if params.RunOnDisconnect != "" {
params.Logger.Log(logger.Info, "runOnDisconnect command launched")
externalcmd.NewCmd(
params.ExternalCmdPool,
params.RunOnDisconnect,
false,
env,
nil)
}
}
}

57
internal/hooks/on_demand.go

@ -0,0 +1,57 @@
package hooks
import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
// OnDemandParams are the parameters of OnDemand.
type OnDemandParams struct {
Logger logger.Writer
ExternalCmdPool *externalcmd.Pool
Conf *conf.Path
ExternalCmdEnv externalcmd.Environment
Query string
}
// OnDemand is the OnDemand hook.
func OnDemand(params OnDemandParams) func(string) {
var env externalcmd.Environment
var onDemandCmd *externalcmd.Cmd
if params.Conf.RunOnDemand != "" || params.Conf.RunOnUnDemand != "" {
env = params.ExternalCmdEnv
env["MTX_QUERY"] = params.Query
}
if params.Conf.RunOnDemand != "" {
params.Logger.Log(logger.Info, "runOnDemand command started")
onDemandCmd = externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnDemand,
params.Conf.RunOnDemandRestart,
env,
func(err error) {
params.Logger.Log(logger.Info, "runOnDemand command exited: %v", err)
})
}
return func(reason string) {
if onDemandCmd != nil {
onDemandCmd.Close()
params.Logger.Log(logger.Info, "runOnDemand command stopped: %v", reason)
}
if params.Conf.RunOnUnDemand != "" {
params.Logger.Log(logger.Info, "runOnUnDemand command launched")
externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnUnDemand,
false,
env,
nil)
}
}
}

39
internal/hooks/on_init.go

@ -0,0 +1,39 @@
package hooks
import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
// OnInitParams are the parameters of OnInit.
type OnInitParams struct {
Logger logger.Writer
ExternalCmdPool *externalcmd.Pool
Conf *conf.Path
ExternalCmdEnv externalcmd.Environment
}
// OnInit is the OnInit hook.
func OnInit(params OnInitParams) func() {
var onInitCmd *externalcmd.Cmd
if params.Conf.RunOnInit != "" {
params.Logger.Log(logger.Info, "runOnInit command started")
onInitCmd = externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnInit,
params.Conf.RunOnInitRestart,
params.ExternalCmdEnv,
func(err error) {
params.Logger.Log(logger.Info, "runOnInit command exited: %v", err)
})
}
return func() {
if onInitCmd != nil {
onInitCmd.Close()
params.Logger.Log(logger.Info, "runOnInit command stopped")
}
}
}

61
internal/hooks/on_read.go

@ -0,0 +1,61 @@
package hooks
import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
// OnReadParams are the parameters of OnRead.
type OnReadParams struct {
Logger logger.Writer
ExternalCmdPool *externalcmd.Pool
Conf *conf.Path
ExternalCmdEnv externalcmd.Environment
Reader defs.APIPathSourceOrReader
Query string
}
// OnRead is the OnRead hook.
func OnRead(params OnReadParams) func() {
var env externalcmd.Environment
var onReadCmd *externalcmd.Cmd
if params.Conf.RunOnRead != "" || params.Conf.RunOnUnread != "" {
env = params.ExternalCmdEnv
desc := params.Reader
env["MTX_QUERY"] = params.Query
env["MTX_READER_TYPE"] = desc.Type
env["MTX_READER_ID"] = desc.ID
}
if params.Conf.RunOnRead != "" {
params.Logger.Log(logger.Info, "runOnRead command started")
onReadCmd = externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnRead,
params.Conf.RunOnReadRestart,
env,
func(err error) {
params.Logger.Log(logger.Info, "runOnRead command exited: %v", err)
})
}
return func() {
if onReadCmd != nil {
onReadCmd.Close()
params.Logger.Log(logger.Info, "runOnRead command stopped")
}
if params.Conf.RunOnUnread != "" {
params.Logger.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnUnread,
false,
env,
nil)
}
}
}

60
internal/hooks/on_ready.go

@ -0,0 +1,60 @@
package hooks
import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
// OnReadyParams are the parameters of OnReady.
type OnReadyParams struct {
Logger logger.Writer
ExternalCmdPool *externalcmd.Pool
Conf *conf.Path
ExternalCmdEnv externalcmd.Environment
Desc defs.APIPathSourceOrReader
Query string
}
// OnReady is the OnReady hook.
func OnReady(params OnReadyParams) func() {
var env externalcmd.Environment
var onReadyCmd *externalcmd.Cmd
if params.Conf.RunOnReady != "" || params.Conf.RunOnNotReady != "" {
env = params.ExternalCmdEnv
env["MTX_QUERY"] = params.Query
env["MTX_SOURCE_TYPE"] = params.Desc.Type
env["MTX_SOURCE_ID"] = params.Desc.ID
}
if params.Conf.RunOnReady != "" {
params.Logger.Log(logger.Info, "runOnReady command started")
onReadyCmd = externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnReady,
params.Conf.RunOnReadyRestart,
env,
func(err error) {
params.Logger.Log(logger.Info, "runOnReady command exited: %v", err)
})
}
return func() {
if onReadyCmd != nil {
onReadyCmd.Close()
params.Logger.Log(logger.Info, "runOnReady command stopped")
}
if params.Conf.RunOnNotReady != "" {
params.Logger.Log(logger.Info, "runOnNotReady command launched")
externalcmd.NewCmd(
params.ExternalCmdPool,
params.Conf.RunOnNotReady,
false,
env,
nil)
}
}
}
Loading…
Cancel
Save