Browse Source

fix crash when publishing to a path with 'runOnDemand' from outside 'runOnDemand' (#2636) (#2637)

pull/2638/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
4f876ed207
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      README.md
  2. 87
      internal/core/path.go
  3. 3
      mediamtx.yml

11
README.md

@ -1272,6 +1272,7 @@ The server allows to specify commands that are executed when a certain event hap
`runOnConnect` allows to run a command when a client connects to the server: `runOnConnect` allows to run a command when a client connects to the server:
```yml ```yml
# Command to run when a client connects to the server.
# This is terminated with SIGINT when a client disconnects from the server. # This is terminated with SIGINT when a client disconnects from the server.
# The following environment variables are available: # The following environment variables are available:
# * RTSP_PORT: RTSP server port # * RTSP_PORT: RTSP server port
@ -1285,6 +1286,7 @@ runOnConnectRestart: no
`runOnDisconnect` allows to run a command when a client disconnects from the server: `runOnDisconnect` allows to run a command when a client disconnects from the server:
```yml ```yml
# Command to run when a client disconnects from the server.
# Environment variables are the same of runOnConnect. # Environment variables are the same of runOnConnect.
runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&conn_id=$MTX_CONN_ID runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&conn_id=$MTX_CONN_ID
``` ```
@ -1294,7 +1296,8 @@ runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&c
```yml ```yml
paths: paths:
mypath: mypath:
# This is terminated with SIGINT when the program closes. # Command to run when this path is initialized.
# This can be used to publish a stream when the server is launched.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
# * RTSP_PORT: RTSP server port # * RTSP_PORT: RTSP server port
@ -1310,6 +1313,8 @@ paths:
```yml ```yml
paths: paths:
mypath: mypath:
# Command to run when this path is requested by a reader
# and no one is publishing to this path yet.
# This is terminated with SIGINT when the program closes. # This is terminated with SIGINT when the program closes.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -1326,6 +1331,8 @@ paths:
```yml ```yml
pathDefaults: pathDefaults:
# Command to run when the stream is ready to be read, whenever it is
# published by a client or pulled from a server / camera.
# This is terminated with SIGINT when the stream is not ready anymore. # This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -1344,6 +1351,7 @@ pathDefaults:
```yml ```yml
pathDefaults: pathDefaults:
# Command to run when the stream is not available anymore.
# Environment variables are the same of runOnReady. # Environment variables are the same of runOnReady.
runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
``` ```
@ -1352,6 +1360,7 @@ pathDefaults:
```yml ```yml
pathDefaults: pathDefaults:
# Command to run when a client starts reading.
# This is terminated with SIGINT when a client stops reading. # This is terminated with SIGINT when a client stops reading.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name

87
internal/core/path.go

@ -201,8 +201,8 @@ type path struct {
chStaticSourceSetReady chan defs.PathSourceStaticSetReadyReq chStaticSourceSetReady chan defs.PathSourceStaticSetReadyReq
chStaticSourceSetNotReady chan defs.PathSourceStaticSetNotReadyReq chStaticSourceSetNotReady chan defs.PathSourceStaticSetNotReadyReq
chDescribe chan pathDescribeReq chDescribe chan pathDescribeReq
chRemovePublisher chan pathRemovePublisherReq
chAddPublisher chan pathAddPublisherReq chAddPublisher chan pathAddPublisherReq
chRemovePublisher chan pathRemovePublisherReq
chStartPublisher chan pathStartPublisherReq chStartPublisher chan pathStartPublisherReq
chStopPublisher chan pathStopPublisherReq chStopPublisher chan pathStopPublisherReq
chAddReader chan pathAddReaderReq chAddReader chan pathAddReaderReq
@ -254,8 +254,8 @@ func newPath(
chStaticSourceSetReady: make(chan defs.PathSourceStaticSetReadyReq), chStaticSourceSetReady: make(chan defs.PathSourceStaticSetReadyReq),
chStaticSourceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq), chStaticSourceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq),
chDescribe: make(chan pathDescribeReq), chDescribe: make(chan pathDescribeReq),
chRemovePublisher: make(chan pathRemovePublisherReq),
chAddPublisher: make(chan pathAddPublisherReq), chAddPublisher: make(chan pathAddPublisherReq),
chRemovePublisher: make(chan pathRemovePublisherReq),
chStartPublisher: make(chan pathStartPublisherReq), chStartPublisher: make(chan pathStartPublisherReq),
chStopPublisher: make(chan pathStopPublisherReq), chStopPublisher: make(chan pathStopPublisherReq),
chAddReader: make(chan pathAddReaderReq), chAddReader: make(chan pathAddReaderReq),
@ -357,7 +357,7 @@ func (pa *path) run() {
} }
if pa.onUnDemandHook != nil { if pa.onUnDemandHook != nil {
pa.onUnDemandHook("path closed") pa.onUnDemandHook("path destroyed")
} }
pa.Log(logger.Debug, "destroyed: %v", err) pa.Log(logger.Debug, "destroyed: %v", err)
@ -414,6 +414,9 @@ func (pa *path) runInner() error {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chAddPublisher:
pa.doAddPublisher(req)
case req := <-pa.chRemovePublisher: case req := <-pa.chRemovePublisher:
pa.doRemovePublisher(req) pa.doRemovePublisher(req)
@ -421,9 +424,6 @@ func (pa *path) runInner() error {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chAddPublisher:
pa.doAddPublisher(req)
case req := <-pa.chStartPublisher: case req := <-pa.chStartPublisher:
pa.doStartPublisher(req) pa.doStartPublisher(req)
@ -519,22 +519,11 @@ func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) {
if pa.conf.HasOnDemandStaticSource() { if pa.conf.HasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop() pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer() pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose() pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
pa.addReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
} }
pa.consumeOnHoldRequests()
req.Res <- defs.PathSourceStaticSetReadyRes{Stream: pa.stream} req.Res <- defs.PathSourceStaticSetReadyRes{Stream: pa.stream}
} }
@ -649,25 +638,14 @@ func (pa *path) doStartPublisher(req pathStartPublisherReq) {
pa.name, pa.name,
mediaInfo(req.desc.Medias)) mediaInfo(req.desc.Medias))
if pa.conf.HasOnDemandPublisher() { if pa.conf.HasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
pa.onDemandPublisherReadyTimer.Stop() pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = newEmptyTimer() pa.onDemandPublisherReadyTimer = newEmptyTimer()
pa.onDemandPublisherScheduleClose() pa.onDemandPublisherScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
pa.addReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
} }
pa.consumeOnHoldRequests()
req.res <- pathStartPublisherRes{stream: pa.stream} req.res <- pathStartPublisherRes{stream: pa.stream}
} }
@ -840,20 +818,15 @@ func (pa *path) onDemandPublisherScheduleClose() {
} }
func (pa *path) onDemandPublisherStop(reason string) { func (pa *path) onDemandPublisherStop(reason string) {
if pa.source != nil {
pa.source.(publisher).close()
pa.executeRemovePublisher()
}
if pa.onDemandPublisherState == pathOnDemandStateClosing { if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherCloseTimer.Stop() pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer() pa.onDemandPublisherCloseTimer = newEmptyTimer()
} }
pa.onDemandPublisherState = pathOnDemandStateInitial
pa.onUnDemandHook(reason) pa.onUnDemandHook(reason)
pa.onUnDemandHook = nil pa.onUnDemandHook = nil
pa.onDemandPublisherState = pathOnDemandStateInitial
} }
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
@ -881,6 +854,20 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
return nil return nil
} }
func (pa *path) consumeOnHoldRequests() {
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
pa.addReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}
func (pa *path) setNotReady() { func (pa *path) setNotReady() {
pa.parent.pathNotReady(pa) pa.parent.pathNotReady(pa)
@ -1048,16 +1035,6 @@ func (pa *path) describe(req pathDescribeReq) pathDescribeRes {
} }
} }
// removePublisher is called by a publisher.
func (pa *path) removePublisher(req pathRemovePublisherReq) {
req.res = make(chan struct{})
select {
case pa.chRemovePublisher <- req:
<-req.res
case <-pa.ctx.Done():
}
}
// addPublisher is called by a publisher through pathManager. // addPublisher is called by a publisher through pathManager.
func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes { func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes {
select { select {
@ -1068,6 +1045,16 @@ func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes {
} }
} }
// removePublisher is called by a publisher.
func (pa *path) removePublisher(req pathRemovePublisherReq) {
req.res = make(chan struct{})
select {
case pa.chRemovePublisher <- req:
<-req.res
case <-pa.ctx.Done():
}
}
// startPublisher is called by a publisher. // startPublisher is called by a publisher.
func (pa *path) startPublisher(req pathStartPublisherReq) pathStartPublisherRes { func (pa *path) startPublisher(req pathStartPublisherReq) pathStartPublisherRes {
req.res = make(chan pathStartPublisherRes) req.res = make(chan pathStartPublisherRes)

3
mediamtx.yml

@ -468,7 +468,8 @@ pathDefaults:
# Restart the command if it exits. # Restart the command if it exits.
runOnInitRestart: no runOnInitRestart: no
# Command to run when this path is requested by a reader. # Command to run when this path is requested by a reader
# and no one is publishing to this path yet.
# This can be used to publish a stream on demand. # This can be used to publish a stream on demand.
# This is terminated with SIGINT when the path is not requested anymore. # This is terminated with SIGINT when the path is not requested anymore.
# The following environment variables are available: # The following environment variables are available:

Loading…
Cancel
Save