Browse Source

add runOnDisconnect, runOnNotReady, runOnUnread (#1464) (#2355)

pull/2356/head
Alessandro Ros 3 years ago committed by GitHub
parent
commit
ed77560811
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      README.md
  2. 6
      apidocs/openapi.yaml
  3. 1
      internal/conf/conf.go
  4. 2
      internal/conf/path.go
  5. 79
      internal/core/conn.go
  6. 17
      internal/core/core.go
  7. 10
      internal/core/path.go
  8. 233
      internal/core/path_manager_test.go
  9. 411
      internal/core/path_test.go
  10. 107
      internal/core/rtmp_conn.go
  11. 4
      internal/core/rtmp_server.go
  12. 31
      internal/core/rtmp_server_test.go
  13. 81
      internal/core/rtsp_conn.go
  14. 4
      internal/core/rtsp_server.go
  15. 30
      internal/core/rtsp_server_test.go
  16. 28
      internal/core/rtsp_session.go
  17. 42
      internal/core/srt_conn.go
  18. 64
      internal/core/srt_server.go
  19. 19
      internal/core/webrtc_manager.go
  20. 48
      internal/core/webrtc_session.go
  21. 18
      internal/externalcmd/cmd.go
  22. 5
      internal/externalcmd/cmd_unix.go
  23. 5
      internal/externalcmd/cmd_win.go
  24. 18
      mediamtx.yml

2
README.md

@ -1155,7 +1155,7 @@ All available recording parameters are listed in the [sample configuration file]
Currently the server supports recording tracks encoded with the following codecs: Currently the server supports recording tracks encoded with the following codecs:
* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video * Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video
* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1 Audio (MP3) * Audio: Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)
### Forward streams to another server ### Forward streams to another server

6
apidocs/openapi.yaml

@ -53,6 +53,8 @@ components:
type: string type: string
runOnConnectRestart: runOnConnectRestart:
type: boolean type: boolean
runOnDisconnect:
type: string
# RTSP # RTSP
rtsp: rtsp:
@ -335,10 +337,14 @@ components:
type: string type: string
runOnReadyRestart: runOnReadyRestart:
type: boolean type: boolean
runOnNotReady:
type: string
runOnRead: runOnRead:
type: string type: string
runOnReadRestart: runOnReadRestart:
type: boolean type: boolean
runOnUnread:
type: string
Path: Path:
type: object type: object

1
internal/conf/conf.go

@ -107,6 +107,7 @@ type Conf struct {
PPROFAddress string `json:"pprofAddress"` PPROFAddress string `json:"pprofAddress"`
RunOnConnect string `json:"runOnConnect"` RunOnConnect string `json:"runOnConnect"`
RunOnConnectRestart bool `json:"runOnConnectRestart"` RunOnConnectRestart bool `json:"runOnConnectRestart"`
RunOnDisconnect string `json:"runOnDisconnect"`
// RTSP // RTSP
RTSP bool `json:"rtsp"` RTSP bool `json:"rtsp"`

2
internal/conf/path.go

@ -116,8 +116,10 @@ type PathConf struct {
RunOnDemandCloseAfter StringDuration `json:"runOnDemandCloseAfter"` RunOnDemandCloseAfter StringDuration `json:"runOnDemandCloseAfter"`
RunOnReady string `json:"runOnReady"` RunOnReady string `json:"runOnReady"`
RunOnReadyRestart bool `json:"runOnReadyRestart"` RunOnReadyRestart bool `json:"runOnReadyRestart"`
RunOnNotReady string `json:"runOnNotReady"`
RunOnRead string `json:"runOnRead"` RunOnRead string `json:"runOnRead"`
RunOnReadRestart bool `json:"runOnReadRestart"` RunOnReadRestart bool `json:"runOnReadRestart"`
RunOnUnread string `json:"runOnUnread"`
} }
func (pconf *PathConf) check(conf *Conf, name string) error { func (pconf *PathConf) check(conf *Conf, name string) error {

79
internal/core/conn.go

@ -0,0 +1,79 @@
package core
import (
"net"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
)
type conn struct {
rtspAddress string
runOnConnect string
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
logger logger.Writer
onConnectCmd *externalcmd.Cmd
}
func newConn(
rtspAddress string,
runOnConnect string,
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
logger logger.Writer,
) *conn {
return &conn{
rtspAddress: rtspAddress,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
logger: logger,
}
}
func (c *conn) open() {
if c.runOnConnect != "" {
c.logger.Log(logger.Info, "runOnConnect command started")
_, port, _ := net.SplitHostPort(c.rtspAddress)
c.onConnectCmd = externalcmd.NewCmd(
c.externalCmdPool,
c.runOnConnect,
c.runOnConnectRestart,
externalcmd.Environment{
"MTX_PATH": "",
"RTSP_PATH": "", // deprecated
"RTSP_PORT": port,
},
func(err error) {
c.logger.Log(logger.Info, "runOnConnect command exited: %v", err)
})
}
}
func (c *conn) close() {
if c.onConnectCmd != nil {
c.onConnectCmd.Close()
c.logger.Log(logger.Info, "runOnConnect command stopped")
}
if c.runOnDisconnect != "" {
c.logger.Log(logger.Info, "runOnDisconnect command launched")
_, port, _ := net.SplitHostPort(c.rtspAddress)
externalcmd.NewCmd(
c.externalCmdPool,
c.runOnDisconnect,
false,
externalcmd.Environment{
"MTX_PATH": "",
"RTSP_PATH": "", // deprecated
"RTSP_PORT": port,
},
nil)
}
}

17
internal/core/core.go

@ -297,6 +297,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.Protocols, p.conf.Protocols,
p.conf.RunOnConnect, p.conf.RunOnConnect,
p.conf.RunOnConnectRestart, p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool, p.externalCmdPool,
p.metrics, p.metrics,
p.pathManager, p.pathManager,
@ -331,6 +332,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.Protocols, p.conf.Protocols,
p.conf.RunOnConnect, p.conf.RunOnConnect,
p.conf.RunOnConnectRestart, p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool, p.externalCmdPool,
p.metrics, p.metrics,
p.pathManager, p.pathManager,
@ -356,6 +358,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RTSPAddress, p.conf.RTSPAddress,
p.conf.RunOnConnect, p.conf.RunOnConnect,
p.conf.RunOnConnectRestart, p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool, p.externalCmdPool,
p.metrics, p.metrics,
p.pathManager, p.pathManager,
@ -381,6 +384,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RTSPAddress, p.conf.RTSPAddress,
p.conf.RunOnConnect, p.conf.RunOnConnect,
p.conf.RunOnConnectRestart, p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool, p.externalCmdPool,
p.metrics, p.metrics,
p.pathManager, p.pathManager,
@ -434,6 +438,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.WebRTCICEHostNAT1To1IPs, p.conf.WebRTCICEHostNAT1To1IPs,
p.conf.WebRTCICEUDPMuxAddress, p.conf.WebRTCICEUDPMuxAddress,
p.conf.WebRTCICETCPMuxAddress, p.conf.WebRTCICETCPMuxAddress,
p.externalCmdPool,
p.pathManager, p.pathManager,
p.metrics, p.metrics,
p, p,
@ -447,10 +452,14 @@ func (p *Core) createResources(initial bool) error {
p.srtServer == nil { p.srtServer == nil {
p.srtServer, err = newSRTServer( p.srtServer, err = newSRTServer(
p.conf.SRTAddress, p.conf.SRTAddress,
p.conf.RTSPAddress,
p.conf.ReadTimeout, p.conf.ReadTimeout,
p.conf.WriteTimeout, p.conf.WriteTimeout,
p.conf.WriteQueueSize, p.conf.WriteQueueSize,
p.conf.UDPMaxPayloadSize, p.conf.UDPMaxPayloadSize,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool, p.externalCmdPool,
p.pathManager, p.pathManager,
p, p,
@ -549,6 +558,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
!reflect.DeepEqual(newConf.Protocols, p.conf.Protocols) || !reflect.DeepEqual(newConf.Protocols, p.conf.Protocols) ||
newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
newConf.RunOnDisconnect != p.conf.RunOnDisconnect ||
closeMetrics || closeMetrics ||
closePathManager || closePathManager ||
closeLogger closeLogger
@ -567,6 +577,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
!reflect.DeepEqual(newConf.Protocols, p.conf.Protocols) || !reflect.DeepEqual(newConf.Protocols, p.conf.Protocols) ||
newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
newConf.RunOnDisconnect != p.conf.RunOnDisconnect ||
closeMetrics || closeMetrics ||
closePathManager || closePathManager ||
closeLogger closeLogger
@ -581,6 +592,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.RTSPAddress != p.conf.RTSPAddress || newConf.RTSPAddress != p.conf.RTSPAddress ||
newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
newConf.RunOnDisconnect != p.conf.RunOnDisconnect ||
closeMetrics || closeMetrics ||
closePathManager || closePathManager ||
closeLogger closeLogger
@ -597,6 +609,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.RTSPAddress != p.conf.RTSPAddress || newConf.RTSPAddress != p.conf.RTSPAddress ||
newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
newConf.RunOnDisconnect != p.conf.RunOnDisconnect ||
closeMetrics || closeMetrics ||
closePathManager || closePathManager ||
closeLogger closeLogger
@ -644,10 +657,14 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeSRTServer := newConf == nil || closeSRTServer := newConf == nil ||
newConf.SRT != p.conf.SRT || newConf.SRT != p.conf.SRT ||
newConf.SRTAddress != p.conf.SRTAddress || newConf.SRTAddress != p.conf.SRTAddress ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.WriteQueueSize != p.conf.WriteQueueSize ||
newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
newConf.RunOnDisconnect != p.conf.RunOnDisconnect ||
closePathManager || closePathManager ||
closeLogger closeLogger

10
internal/core/path.go

@ -935,6 +935,16 @@ func (pa *path) setNotReady() {
pa.Log(logger.Info, "runOnReady command stopped") pa.Log(logger.Info, "runOnReady command stopped")
} }
if pa.conf.RunOnNotReady != "" {
pa.Log(logger.Info, "runOnNotReady command launched")
externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnNotReady,
false,
pa.externalCmdEnv(),
nil)
}
if pa.recordAgent != nil { if pa.recordAgent != nil {
pa.recordAgent.Close() pa.recordAgent.Close()
pa.recordAgent = nil pa.recordAgent = nil

233
internal/core/path_manager_test.go

@ -2,19 +2,11 @@ package core
import ( import (
"bufio" "bufio"
"fmt"
"net" "net"
"os"
"os/exec"
"path/filepath"
"testing" "testing"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
"github.com/bluenviron/gortsplib/v4/pkg/url" "github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -91,228 +83,3 @@ func TestPathAutoDeletion(t *testing.T) {
}) })
} }
} }
func TestPathRunOnDemand(t *testing.T) {
doneFile := filepath.Join(os.TempDir(), "ondemand_done")
srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile, []byte(`
package main
import (
"os"
"os/signal"
"syscall"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
func main() {
if os.Getenv("G1") != "on" {
panic("environment not set")
}
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
PacketizationMode: 1,
}},
}
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:" + os.Getenv("RTSP_PORT") + "/" + os.Getenv("MTX_PATH"),
&description.Session{Medias: []*description.Media{medi}})
if err != nil {
panic(err)
}
defer source.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
<-c
err = os.WriteFile("`+doneFile+`", []byte(""), 0644)
if err != nil {
panic(err)
}
}
`), 0o644)
require.NoError(t, err)
execFile := filepath.Join(os.TempDir(), "ondemand_cmd")
cmd := exec.Command("go", "build", "-o", execFile, srcFile)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
require.NoError(t, err)
defer os.Remove(execFile)
os.Remove(srcFile)
for _, ca := range []string{"describe", "setup", "describe and setup"} {
t.Run(ca, func(t *testing.T) {
defer os.Remove(doneFile)
p1, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
"hls: no\n"+
"webrtc: no\n"+
"paths:\n"+
" '~^(on)demand$':\n"+
" runOnDemand: %s\n"+
" runOnDemandCloseAfter: 1s\n", execFile))
require.Equal(t, true, ok)
defer p1.Close()
var control string
func() {
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
br := bufio.NewReader(conn)
if ca == "describe" || ca == "describe and setup" {
u, err := url.Parse("rtsp://localhost:8554/ondemand")
require.NoError(t, err)
byts, _ := base.Request{
Method: base.Describe,
URL: u,
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
},
}.Marshal()
_, err = conn.Write(byts)
require.NoError(t, err)
var res base.Response
err = res.Unmarshal(br)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var desc sdp.SessionDescription
err = desc.Unmarshal(res.Body)
require.NoError(t, err)
control, _ = desc.MediaDescriptions[0].Attribute("control")
} else {
control = "rtsp://localhost:8554/ondemand/"
}
if ca == "setup" || ca == "describe and setup" {
u, err := url.Parse(control)
require.NoError(t, err)
byts, _ := base.Request{
Method: base.Setup,
URL: u,
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": headers.Transport{
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}.Marshal(),
},
}.Marshal()
_, err = conn.Write(byts)
require.NoError(t, err)
var res base.Response
err = res.Unmarshal(br)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
}()
for {
_, err := os.Stat(doneFile)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
})
}
}
func TestPathRunOnReady(t *testing.T) {
doneFile := filepath.Join(os.TempDir(), "onready_done")
defer os.Remove(doneFile)
p, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
"hls: no\n"+
"webrtc: no\n"+
"paths:\n"+
" test:\n"+
" runOnReady: touch %s\n",
doneFile))
require.Equal(t, true, ok)
defer p.Close()
medi := testMediaH264
c := gortsplib.Client{}
err := c.StartRecording(
"rtsp://localhost:8554/test",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer c.Close()
time.Sleep(1 * time.Second)
_, err = os.Stat(doneFile)
require.NoError(t, err)
}
func TestPathMaxReaders(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n" +
" maxReaders: 1\n")
require.Equal(t, true, ok)
defer p.Close()
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/mystream",
&description.Session{Medias: []*description.Media{
testMediaH264,
testMediaAAC,
}})
require.NoError(t, err)
defer source.Close()
for i := 0; i < 2; i++ {
reader := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/mystream")
require.NoError(t, err)
err = reader.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer reader.Close()
desc, _, err := reader.Describe(u)
require.NoError(t, err)
err = reader.SetupAll(desc.BaseURL, desc.Medias)
if i != 1 {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}

411
internal/core/path_test.go

@ -0,0 +1,411 @@
package core
import (
"bufio"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
rtspurl "github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/datarhei/gosrt"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/rtmp"
)
func TestPathRunOnDemand(t *testing.T) {
onDemandFile := filepath.Join(os.TempDir(), "ondemand")
srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile, []byte(`
package main
import (
"os"
"os/signal"
"syscall"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
func main() {
if os.Getenv("G1") != "on" {
panic("environment not set")
}
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
PacketizationMode: 1,
}},
}
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:" + os.Getenv("RTSP_PORT") + "/" + os.Getenv("MTX_PATH"),
&description.Session{Medias: []*description.Media{medi}})
if err != nil {
panic(err)
}
defer source.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
<-c
err = os.WriteFile("`+onDemandFile+`", []byte(""), 0644)
if err != nil {
panic(err)
}
}
`), 0o644)
require.NoError(t, err)
execFile := filepath.Join(os.TempDir(), "ondemand_cmd")
cmd := exec.Command("go", "build", "-o", execFile, srcFile)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
require.NoError(t, err)
defer os.Remove(execFile)
os.Remove(srcFile)
for _, ca := range []string{"describe", "setup", "describe and setup"} {
t.Run(ca, func(t *testing.T) {
defer os.Remove(onDemandFile)
p1, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
"hls: no\n"+
"webrtc: no\n"+
"paths:\n"+
" '~^(on)demand$':\n"+
" runOnDemand: %s\n"+
" runOnDemandCloseAfter: 1s\n", execFile))
require.Equal(t, true, ok)
defer p1.Close()
var control string
func() {
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
br := bufio.NewReader(conn)
if ca == "describe" || ca == "describe and setup" {
u, err := rtspurl.Parse("rtsp://localhost:8554/ondemand")
require.NoError(t, err)
byts, _ := base.Request{
Method: base.Describe,
URL: u,
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
},
}.Marshal()
_, err = conn.Write(byts)
require.NoError(t, err)
var res base.Response
err = res.Unmarshal(br)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var desc sdp.SessionDescription
err = desc.Unmarshal(res.Body)
require.NoError(t, err)
control, _ = desc.MediaDescriptions[0].Attribute("control")
} else {
control = "rtsp://localhost:8554/ondemand/"
}
if ca == "setup" || ca == "describe and setup" {
u, err := rtspurl.Parse(control)
require.NoError(t, err)
byts, _ := base.Request{
Method: base.Setup,
URL: u,
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": headers.Transport{
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}.Marshal(),
},
}.Marshal()
_, err = conn.Write(byts)
require.NoError(t, err)
var res base.Response
err = res.Unmarshal(br)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
}()
for {
_, err := os.Stat(onDemandFile)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
})
}
}
func TestPathRunOnConnect(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt"} {
t.Run(ca, func(t *testing.T) {
onConnectFile := filepath.Join(os.TempDir(), "onconnect")
defer os.Remove(onConnectFile)
onDisconnectFile := filepath.Join(os.TempDir(), "ondisconnect")
defer os.Remove(onDisconnectFile)
func() {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
" test:\n"+
"runOnConnect: touch %s\n"+
"runOnDisconnect: touch %s\n",
onConnectFile, onDisconnectFile))
require.Equal(t, true, ok)
defer p.Close()
switch ca {
case "rtsp":
c := gortsplib.Client{}
err := c.StartRecording(
"rtsp://localhost:8554/test",
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer c.Close()
case "rtmp":
u, err := url.Parse("rtmp://127.0.0.1:1935/test")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()
_, err = rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
case "srt":
conf := srt.DefaultConfig()
address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=publish:test")
require.NoError(t, err)
err = conf.Validate()
require.NoError(t, err)
c, err := srt.Dial("srt", address, conf)
require.NoError(t, err)
defer c.Close()
}
time.Sleep(500 * time.Millisecond)
}()
_, err := os.Stat(onConnectFile)
require.NoError(t, err)
_, err = os.Stat(onDisconnectFile)
require.NoError(t, err)
})
}
}
func TestPathRunOnReady(t *testing.T) {
onReadyFile := filepath.Join(os.TempDir(), "onready")
defer os.Remove(onReadyFile)
onNotReadyFile := filepath.Join(os.TempDir(), "onunready")
defer os.Remove(onNotReadyFile)
func() {
p, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
"hls: no\n"+
"webrtc: no\n"+
"paths:\n"+
" test:\n"+
" runOnReady: touch %s\n"+
" runOnNotReady: touch %s\n",
onReadyFile, onNotReadyFile))
require.Equal(t, true, ok)
defer p.Close()
c := gortsplib.Client{}
err := c.StartRecording(
"rtsp://localhost:8554/test",
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer c.Close()
time.Sleep(500 * time.Millisecond)
}()
_, err := os.Stat(onReadyFile)
require.NoError(t, err)
_, err = os.Stat(onNotReadyFile)
require.NoError(t, err)
}
func TestPathRunOnRead(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} {
t.Run(ca, func(t *testing.T) {
onReadFile := filepath.Join(os.TempDir(), "onread")
defer os.Remove(onReadFile)
onUnreadFile := filepath.Join(os.TempDir(), "onunread")
defer os.Remove(onUnreadFile)
func() {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
" test:\n"+
" runOnRead: touch %s\n"+
" runOnUnread: touch %s\n",
onReadFile, onUnreadFile))
require.Equal(t, true, ok)
defer p.Close()
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/test",
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer source.Close()
switch ca {
case "rtsp":
reader := gortsplib.Client{}
u, err := rtspurl.Parse("rtsp://127.0.0.1:8554/test")
require.NoError(t, err)
err = reader.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer reader.Close()
desc, _, err := reader.Describe(u)
require.NoError(t, err)
err = reader.SetupAll(desc.BaseURL, desc.Medias)
require.NoError(t, err)
_, err = reader.Play(nil)
require.NoError(t, err)
case "rtmp":
u, err := url.Parse("rtmp://127.0.0.1:1935/test")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()
conn, err := rtmp.NewClientConn(nconn, u, false)
require.NoError(t, err)
_, err = rtmp.NewReader(conn)
require.NoError(t, err)
case "srt":
conf := srt.DefaultConfig()
address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=read:test")
require.NoError(t, err)
err = conf.Validate()
require.NoError(t, err)
reader, err := srt.Dial("srt", address, conf)
require.NoError(t, err)
defer reader.Close()
case "webrtc":
hc := &http.Client{Transport: &http.Transport{}}
c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep", false)
defer c.close()
}
time.Sleep(500 * time.Millisecond)
}()
_, err := os.Stat(onReadFile)
require.NoError(t, err)
_, err = os.Stat(onUnreadFile)
require.NoError(t, err)
})
}
}
func TestPathMaxReaders(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n" +
" maxReaders: 1\n")
require.Equal(t, true, ok)
defer p.Close()
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/mystream",
&description.Session{Medias: []*description.Media{
testMediaH264,
testMediaAAC,
}})
require.NoError(t, err)
defer source.Close()
for i := 0; i < 2; i++ {
reader := gortsplib.Client{}
u, err := rtspurl.Parse("rtsp://127.0.0.1:8554/mystream")
require.NoError(t, err)
err = reader.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer reader.Close()
desc, _, err := reader.Describe(u)
require.NoError(t, err)
err = reader.SetupAll(desc.BaseURL, desc.Medias)
if i != 1 {
require.NoError(t, err)
} else {
require.Error(t, err)
}
}
}

107
internal/core/rtmp_conn.go

@ -56,25 +56,24 @@ type rtmpConnParent interface {
} }
type rtmpConn struct { type rtmpConn struct {
isTLS bool *conn
rtspAddress string
readTimeout conf.StringDuration isTLS bool
writeTimeout conf.StringDuration readTimeout conf.StringDuration
writeQueueSize int writeTimeout conf.StringDuration
runOnConnect string writeQueueSize int
runOnConnectRestart bool wg *sync.WaitGroup
wg *sync.WaitGroup nconn net.Conn
nconn net.Conn externalCmdPool *externalcmd.Pool
externalCmdPool *externalcmd.Pool pathManager rtmpConnPathManager
pathManager rtmpConnPathManager parent rtmpConnParent
parent rtmpConnParent
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
uuid uuid.UUID uuid uuid.UUID
created time.Time created time.Time
mutex sync.RWMutex mutex sync.RWMutex
conn *rtmp.Conn rconn *rtmp.Conn
state rtmpConnState state rtmpConnState
pathName string pathName string
} }
@ -88,6 +87,7 @@ func newRTMPConn(
writeQueueSize int, writeQueueSize int,
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
runOnDisconnect string,
wg *sync.WaitGroup, wg *sync.WaitGroup,
nconn net.Conn, nconn net.Conn,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
@ -97,24 +97,30 @@ func newRTMPConn(
ctx, ctxCancel := context.WithCancel(parentCtx) ctx, ctxCancel := context.WithCancel(parentCtx)
c := &rtmpConn{ c := &rtmpConn{
isTLS: isTLS, isTLS: isTLS,
rtspAddress: rtspAddress, readTimeout: readTimeout,
readTimeout: readTimeout, writeTimeout: writeTimeout,
writeTimeout: writeTimeout, writeQueueSize: writeQueueSize,
writeQueueSize: writeQueueSize, wg: wg,
runOnConnect: runOnConnect, nconn: nconn,
runOnConnectRestart: runOnConnectRestart, externalCmdPool: externalCmdPool,
wg: wg, pathManager: pathManager,
nconn: nconn, parent: parent,
externalCmdPool: externalCmdPool, ctx: ctx,
pathManager: pathManager, ctxCancel: ctxCancel,
parent: parent, uuid: uuid.New(),
ctx: ctx, created: time.Now(),
ctxCancel: ctxCancel,
uuid: uuid.New(),
created: time.Now(),
} }
c.conn = newConn(
rtspAddress,
runOnConnect,
runOnConnectRestart,
runOnDisconnect,
externalCmdPool,
c,
)
c.Log(logger.Info, "opened") c.Log(logger.Info, "opened")
c.wg.Add(1) c.wg.Add(1)
@ -139,30 +145,11 @@ func (c *rtmpConn) ip() net.IP {
return c.nconn.RemoteAddr().(*net.TCPAddr).IP return c.nconn.RemoteAddr().(*net.TCPAddr).IP
} }
func (c *rtmpConn) run() { func (c *rtmpConn) run() { //nolint:dupl
defer c.wg.Done() defer c.wg.Done()
if c.runOnConnect != "" { c.conn.open()
c.Log(logger.Info, "runOnConnect command started") defer c.conn.close()
_, port, _ := net.SplitHostPort(c.rtspAddress)
onConnectCmd := externalcmd.NewCmd(
c.externalCmdPool,
c.runOnConnect,
c.runOnConnectRestart,
externalcmd.Environment{
"MTX_PATH": "",
"RTSP_PATH": "", // deprecated
"RTSP_PORT": port,
},
func(err error) {
c.Log(logger.Info, "runOnConnect command exited: %v", err)
})
defer func() {
onConnectCmd.Close()
c.Log(logger.Info, "runOnConnect command stopped")
}()
}
err := c.runInner() err := c.runInner()
@ -200,7 +187,7 @@ func (c *rtmpConn) runReader() error {
} }
c.mutex.Lock() c.mutex.Lock()
c.conn = conn c.rconn = conn
c.mutex.Unlock() c.mutex.Unlock()
if !publish { if !publish {
@ -290,6 +277,18 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
}() }()
} }
if pathConf.RunOnUnread != "" {
defer func() {
c.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnUnread,
false,
res.path.externalCmdEnv(),
nil)
}()
}
var err error var err error
w, err = rtmp.NewWriter(conn, videoFormat, audioFormat) w, err = rtmp.NewWriter(conn, videoFormat, audioFormat)
if err != nil { if err != nil {
@ -656,8 +655,8 @@ func (c *rtmpConn) apiItem() *apiRTMPConn {
bytesSent := uint64(0) bytesSent := uint64(0)
if c.conn != nil { if c.conn != nil {
bytesReceived = c.conn.BytesReceived() bytesReceived = c.rconn.BytesReceived()
bytesSent = c.conn.BytesSent() bytesSent = c.rconn.BytesSent()
} }
return &apiRTMPConn{ return &apiRTMPConn{

4
internal/core/rtmp_server.go

@ -55,6 +55,7 @@ type rtmpServer struct {
rtspAddress string rtspAddress string
runOnConnect string runOnConnect string
runOnConnectRestart bool runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool externalCmdPool *externalcmd.Pool
metrics *metrics metrics *metrics
pathManager *pathManager pathManager *pathManager
@ -86,6 +87,7 @@ func newRTMPServer(
rtspAddress string, rtspAddress string,
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
metrics *metrics, metrics *metrics,
pathManager *pathManager, pathManager *pathManager,
@ -117,6 +119,7 @@ func newRTMPServer(
rtspAddress: rtspAddress, rtspAddress: rtspAddress,
runOnConnect: runOnConnect, runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart, runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
isTLS: isTLS, isTLS: isTLS,
externalCmdPool: externalCmdPool, externalCmdPool: externalCmdPool,
metrics: metrics, metrics: metrics,
@ -188,6 +191,7 @@ outer:
s.writeQueueSize, s.writeQueueSize,
s.runOnConnect, s.runOnConnect,
s.runOnConnectRestart, s.runOnConnectRestart,
s.runOnDisconnect,
&s.wg, &s.wg,
nconn, nconn,
s.externalCmdPool, s.externalCmdPool,

31
internal/core/rtmp_server_test.go

@ -15,36 +15,6 @@ import (
"github.com/bluenviron/mediamtx/internal/rtmp" "github.com/bluenviron/mediamtx/internal/rtmp"
) )
func TestRTMPServerRunOnConnect(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "rtspss-runonconnect-")
require.NoError(t, err)
f.Close()
defer os.Remove(f.Name())
p, ok := newInstance(
"runOnConnect: sh -c 'echo aa > " + f.Name() + "'\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
u, err := url.Parse("rtmp://127.0.0.1:1935/teststream")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()
_, err = rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
byts, err := os.ReadFile(f.Name())
require.NoError(t, err)
require.Equal(t, "aa\n", string(byts))
}
func TestRTMPServer(t *testing.T) { func TestRTMPServer(t *testing.T) {
for _, encrypt := range []string{ for _, encrypt := range []string{
"plain", "plain",
@ -179,6 +149,7 @@ func TestRTMPServer(t *testing.T) {
r, err := rtmp.NewReader(conn2) r, err := rtmp.NewReader(conn2)
require.NoError(t, err) require.NoError(t, err)
videoTrack1, audioTrack2 := r.Tracks() videoTrack1, audioTrack2 := r.Tracks()
require.Equal(t, videoTrack, videoTrack1) require.Equal(t, videoTrack, videoTrack1)
require.Equal(t, audioTrack, audioTrack2) require.Equal(t, audioTrack, audioTrack2)

81
internal/core/rtsp_conn.go

@ -27,19 +27,17 @@ type rtspConnParent interface {
} }
type rtspConn struct { type rtspConn struct {
rtspAddress string *conn
authMethods []headers.AuthMethod
readTimeout conf.StringDuration rtspAddress string
runOnConnect string authMethods []headers.AuthMethod
runOnConnectRestart bool readTimeout conf.StringDuration
externalCmdPool *externalcmd.Pool pathManager *pathManager
pathManager *pathManager rconn *gortsplib.ServerConn
conn *gortsplib.ServerConn parent rtspConnParent
parent rtspConnParent
uuid uuid.UUID uuid uuid.UUID
created time.Time created time.Time
onConnectCmd *externalcmd.Cmd
authNonce string authNonce string
authFailures int authFailures int
} }
@ -50,72 +48,61 @@ func newRTSPConn(
readTimeout conf.StringDuration, readTimeout conf.StringDuration,
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
pathManager *pathManager, pathManager *pathManager,
conn *gortsplib.ServerConn, conn *gortsplib.ServerConn,
parent rtspConnParent, parent rtspConnParent,
) *rtspConn { ) *rtspConn {
c := &rtspConn{ c := &rtspConn{
rtspAddress: rtspAddress, rtspAddress: rtspAddress,
authMethods: authMethods, authMethods: authMethods,
readTimeout: readTimeout, readTimeout: readTimeout,
runOnConnect: runOnConnect, pathManager: pathManager,
runOnConnectRestart: runOnConnectRestart, rconn: conn,
externalCmdPool: externalCmdPool, parent: parent,
pathManager: pathManager, uuid: uuid.New(),
conn: conn, created: time.Now(),
parent: parent,
uuid: uuid.New(),
created: time.Now(),
} }
c.conn = newConn(
rtspAddress,
runOnConnect,
runOnConnectRestart,
runOnDisconnect,
externalCmdPool,
c,
)
c.Log(logger.Info, "opened") c.Log(logger.Info, "opened")
if c.runOnConnect != "" { c.conn.open()
c.Log(logger.Info, "runOnConnect command started")
_, port, _ := net.SplitHostPort(c.rtspAddress)
c.onConnectCmd = externalcmd.NewCmd(
c.externalCmdPool,
c.runOnConnect,
c.runOnConnectRestart,
externalcmd.Environment{
"MTX_PATH": "",
"RTSP_PATH": "", // deprecated
"RTSP_PORT": port,
},
func(err error) {
c.Log(logger.Info, "runOnInit command exited: %v", err)
})
}
return c return c
} }
func (c *rtspConn) Log(level logger.Level, format string, args ...interface{}) { func (c *rtspConn) Log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.rconn.NetConn().RemoteAddr()}, args...)...)
} }
// Conn returns the RTSP connection. // Conn returns the RTSP connection.
func (c *rtspConn) Conn() *gortsplib.ServerConn { func (c *rtspConn) Conn() *gortsplib.ServerConn {
return c.conn return c.rconn
} }
func (c *rtspConn) remoteAddr() net.Addr { func (c *rtspConn) remoteAddr() net.Addr {
return c.conn.NetConn().RemoteAddr() return c.rconn.NetConn().RemoteAddr()
} }
func (c *rtspConn) ip() net.IP { func (c *rtspConn) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP return c.rconn.NetConn().RemoteAddr().(*net.TCPAddr).IP
} }
// onClose is called by rtspServer. // onClose is called by rtspServer.
func (c *rtspConn) onClose(err error) { func (c *rtspConn) onClose(err error) {
c.Log(logger.Info, "closed: %v", err) c.Log(logger.Info, "closed: %v", err)
if c.onConnectCmd != nil { c.conn.close()
c.onConnectCmd.Close()
c.Log(logger.Info, "runOnConnect command stopped")
}
} }
// onRequest is called by rtspServer. // onRequest is called by rtspServer.
@ -231,7 +218,7 @@ func (c *rtspConn) apiItem() *apiRTSPConn {
ID: c.uuid, ID: c.uuid,
Created: c.created, Created: c.created,
RemoteAddr: c.remoteAddr().String(), RemoteAddr: c.remoteAddr().String(),
BytesReceived: c.conn.BytesReceived(), BytesReceived: c.rconn.BytesReceived(),
BytesSent: c.conn.BytesSent(), BytesSent: c.rconn.BytesSent(),
} }
} }

4
internal/core/rtsp_server.go

@ -48,6 +48,7 @@ type rtspServer struct {
protocols map[conf.Protocol]struct{} protocols map[conf.Protocol]struct{}
runOnConnect string runOnConnect string
runOnConnectRestart bool runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool externalCmdPool *externalcmd.Pool
metrics *metrics metrics *metrics
pathManager *pathManager pathManager *pathManager
@ -82,6 +83,7 @@ func newRTSPServer(
protocols map[conf.Protocol]struct{}, protocols map[conf.Protocol]struct{},
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
metrics *metrics, metrics *metrics,
pathManager *pathManager, pathManager *pathManager,
@ -97,6 +99,7 @@ func newRTSPServer(
protocols: protocols, protocols: protocols,
runOnConnect: runOnConnect, runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart, runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool, externalCmdPool: externalCmdPool,
metrics: metrics, metrics: metrics,
pathManager: pathManager, pathManager: pathManager,
@ -219,6 +222,7 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
s.readTimeout, s.readTimeout,
s.runOnConnect, s.runOnConnect,
s.runOnConnectRestart, s.runOnConnectRestart,
s.runOnDisconnect,
s.externalCmdPool, s.externalCmdPool,
s.pathManager, s.pathManager,
ctx.Conn, ctx.Conn,

30
internal/core/rtsp_server_test.go

@ -1,9 +1,7 @@
package core package core
import ( import (
"os"
"testing" "testing"
"time"
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
@ -12,34 +10,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestRTSPServerRunOnConnect(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), "rtspss-runonconnect-")
require.NoError(t, err)
f.Close()
defer os.Remove(f.Name())
p, ok := newInstance(
"runOnConnect: sh -c 'echo aa > " + f.Name() + "'\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
source := gortsplib.Client{}
err = source.StartRecording(
"rtsp://127.0.0.1:8554/mypath",
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer source.Close()
time.Sleep(500 * time.Millisecond)
byts, err := os.ReadFile(f.Name())
require.NoError(t, err)
require.Equal(t, "aa\n", string(byts))
}
func TestRTSPServer(t *testing.T) { func TestRTSPServer(t *testing.T) {
for _, auth := range []string{ for _, auth := range []string{
"none", "none",

28
internal/core/rtsp_session.go

@ -96,14 +96,27 @@ func (s *rtspSession) Log(level logger.Level, format string, args ...interface{}
s.parent.Log(level, "[session %s] "+format, append([]interface{}{id}, args...)...) s.parent.Log(level, "[session %s] "+format, append([]interface{}{id}, args...)...)
} }
func (s *rtspSession) onUnread() {
if s.onReadCmd != nil {
s.Log(logger.Info, "runOnRead command stopped")
s.onReadCmd.Close()
}
if s.path.conf.RunOnUnread != "" {
s.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
s.externalCmdPool,
s.path.conf.RunOnUnread,
false,
s.path.externalCmdEnv(),
nil)
}
}
// onClose is called by rtspServer. // onClose is called by rtspServer.
func (s *rtspSession) onClose(err error) { func (s *rtspSession) onClose(err error) {
if s.session.State() == gortsplib.ServerSessionStatePlay { if s.session.State() == gortsplib.ServerSessionStatePlay {
if s.onReadCmd != nil { s.onUnread()
s.onReadCmd.Close()
s.onReadCmd = nil
s.Log(logger.Info, "runOnRead command stopped")
}
} }
switch s.session.State() { switch s.session.State() {
@ -363,10 +376,7 @@ func (s *rtspSession) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Res
func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) { func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
switch s.session.State() { switch s.session.State() {
case gortsplib.ServerSessionStatePlay: case gortsplib.ServerSessionStatePlay:
if s.onReadCmd != nil { s.onUnread()
s.Log(logger.Info, "runOnRead command stopped")
s.onReadCmd.Close()
}
s.mutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay s.state = gortsplib.ServerSessionStatePrePlay

42
internal/core/srt_conn.go

@ -48,6 +48,9 @@ type srtConnParent interface {
} }
type srtConn struct { type srtConn struct {
*conn
rtspAddress string
readTimeout conf.StringDuration readTimeout conf.StringDuration
writeTimeout conf.StringDuration writeTimeout conf.StringDuration
writeQueueSize int writeQueueSize int
@ -65,7 +68,7 @@ type srtConn struct {
mutex sync.RWMutex mutex sync.RWMutex
state srtConnState state srtConnState
pathName string pathName string
conn srt.Conn sconn srt.Conn
chNew chan srtNewConnReq chNew chan srtNewConnReq
chSetConn chan srt.Conn chSetConn chan srt.Conn
@ -73,11 +76,15 @@ type srtConn struct {
func newSRTConn( func newSRTConn(
parentCtx context.Context, parentCtx context.Context,
rtspAddress string,
readTimeout conf.StringDuration, readTimeout conf.StringDuration,
writeTimeout conf.StringDuration, writeTimeout conf.StringDuration,
writeQueueSize int, writeQueueSize int,
udpMaxPayloadSize int, udpMaxPayloadSize int,
connReq srt.ConnRequest, connReq srt.ConnRequest,
runOnConnect string,
runOnConnectRestart bool,
runOnDisconnect string,
wg *sync.WaitGroup, wg *sync.WaitGroup,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
pathManager srtConnPathManager, pathManager srtConnPathManager,
@ -86,6 +93,7 @@ func newSRTConn(
ctx, ctxCancel := context.WithCancel(parentCtx) ctx, ctxCancel := context.WithCancel(parentCtx)
c := &srtConn{ c := &srtConn{
rtspAddress: rtspAddress,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
writeQueueSize: writeQueueSize, writeQueueSize: writeQueueSize,
@ -103,6 +111,15 @@ func newSRTConn(
chSetConn: make(chan srt.Conn), chSetConn: make(chan srt.Conn),
} }
c.conn = newConn(
rtspAddress,
runOnConnect,
runOnConnectRestart,
runOnDisconnect,
externalCmdPool,
c,
)
c.Log(logger.Info, "opened") c.Log(logger.Info, "opened")
c.wg.Add(1) c.wg.Add(1)
@ -123,9 +140,12 @@ func (c *srtConn) ip() net.IP {
return c.connReq.RemoteAddr().(*net.UDPAddr).IP return c.connReq.RemoteAddr().(*net.UDPAddr).IP
} }
func (c *srtConn) run() { func (c *srtConn) run() { //nolint:dupl
defer c.wg.Done() defer c.wg.Done()
c.conn.open()
defer c.conn.close()
err := c.runInner() err := c.runInner()
c.ctxCancel() c.ctxCancel()
@ -208,7 +228,7 @@ func (c *srtConn) runPublish(req srtNewConnReq, pathName string, user string, pa
c.mutex.Lock() c.mutex.Lock()
c.state = srtConnStatePublish c.state = srtConnStatePublish
c.pathName = pathName c.pathName = pathName
c.conn = sconn c.sconn = sconn
c.mutex.Unlock() c.mutex.Unlock()
readerErr := make(chan error) readerErr := make(chan error)
@ -455,7 +475,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
c.mutex.Lock() c.mutex.Lock()
c.state = srtConnStateRead c.state = srtConnStateRead
c.pathName = pathName c.pathName = pathName
c.conn = sconn c.sconn = sconn
c.mutex.Unlock() c.mutex.Unlock()
writer := asyncwriter.New(c.writeQueueSize, c) writer := asyncwriter.New(c.writeQueueSize, c)
@ -711,6 +731,18 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
}() }()
} }
if pathConf.RunOnUnread != "" {
defer func() {
c.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnUnread,
false,
res.path.externalCmdEnv(),
nil)
}()
}
w = mpegts.NewWriter(bw, tracks) w = mpegts.NewWriter(bw, tracks)
// disable read deadline // disable read deadline
@ -781,7 +813,7 @@ func (c *srtConn) apiItem() *apiSRTConn {
if c.conn != nil { if c.conn != nil {
var s srt.Statistics var s srt.Statistics
c.conn.Stats(&s) c.sconn.Stats(&s)
bytesReceived = s.Accumulated.ByteRecv bytesReceived = s.Accumulated.ByteRecv
bytesSent = s.Accumulated.ByteSent bytesSent = s.Accumulated.ByteSent
} }

64
internal/core/srt_server.go

@ -57,13 +57,17 @@ type srtServerParent interface {
} }
type srtServer struct { type srtServer struct {
readTimeout conf.StringDuration rtspAddress string
writeTimeout conf.StringDuration readTimeout conf.StringDuration
writeQueueSize int writeTimeout conf.StringDuration
udpMaxPayloadSize int writeQueueSize int
externalCmdPool *externalcmd.Pool udpMaxPayloadSize int
pathManager *pathManager runOnConnect string
parent srtServerParent runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
pathManager *pathManager
parent srtServerParent
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
@ -82,10 +86,14 @@ type srtServer struct {
func newSRTServer( func newSRTServer(
address string, address string,
rtspAddress string,
readTimeout conf.StringDuration, readTimeout conf.StringDuration,
writeTimeout conf.StringDuration, writeTimeout conf.StringDuration,
writeQueueSize int, writeQueueSize int,
udpMaxPayloadSize int, udpMaxPayloadSize int,
runOnConnect string,
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
pathManager *pathManager, pathManager *pathManager,
parent srtServerParent, parent srtServerParent,
@ -102,23 +110,27 @@ func newSRTServer(
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
s := &srtServer{ s := &srtServer{
readTimeout: readTimeout, rtspAddress: rtspAddress,
writeTimeout: writeTimeout, readTimeout: readTimeout,
writeQueueSize: writeQueueSize, writeTimeout: writeTimeout,
udpMaxPayloadSize: udpMaxPayloadSize, writeQueueSize: writeQueueSize,
externalCmdPool: externalCmdPool, udpMaxPayloadSize: udpMaxPayloadSize,
pathManager: pathManager, runOnConnect: runOnConnect,
parent: parent, runOnConnectRestart: runOnConnectRestart,
ctx: ctx, runOnDisconnect: runOnDisconnect,
ctxCancel: ctxCancel, externalCmdPool: externalCmdPool,
ln: ln, pathManager: pathManager,
conns: make(map[*srtConn]struct{}), parent: parent,
chNewConnRequest: make(chan srtNewConnReq), ctx: ctx,
chAcceptErr: make(chan error), ctxCancel: ctxCancel,
chCloseConn: make(chan *srtConn), ln: ln,
chAPIConnsList: make(chan srtServerAPIConnsListReq), conns: make(map[*srtConn]struct{}),
chAPIConnsGet: make(chan srtServerAPIConnsGetReq), chNewConnRequest: make(chan srtNewConnReq),
chAPIConnsKick: make(chan srtServerAPIConnsKickReq), chAcceptErr: make(chan error),
chCloseConn: make(chan *srtConn),
chAPIConnsList: make(chan srtServerAPIConnsListReq),
chAPIConnsGet: make(chan srtServerAPIConnsGetReq),
chAPIConnsKick: make(chan srtServerAPIConnsKickReq),
} }
s.Log(logger.Info, "listener opened on "+address+" (UDP)") s.Log(logger.Info, "listener opened on "+address+" (UDP)")
@ -159,11 +171,15 @@ outer:
case req := <-s.chNewConnRequest: case req := <-s.chNewConnRequest:
c := newSRTConn( c := newSRTConn(
s.ctx, s.ctx,
s.rtspAddress,
s.readTimeout, s.readTimeout,
s.writeTimeout, s.writeTimeout,
s.writeQueueSize, s.writeQueueSize,
s.udpMaxPayloadSize, s.udpMaxPayloadSize,
req.connReq, req.connReq,
s.runOnConnect,
s.runOnConnectRestart,
s.runOnDisconnect,
&s.wg, &s.wg,
s.externalCmdPool, s.externalCmdPool,
s.pathManager, s.pathManager,

19
internal/core/webrtc_manager.go

@ -20,6 +20,7 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -276,13 +277,14 @@ type webRTCManagerParent interface {
} }
type webRTCManager struct { type webRTCManager struct {
allowOrigin string allowOrigin string
trustedProxies conf.IPsOrCIDRs trustedProxies conf.IPsOrCIDRs
iceServers []conf.WebRTCICEServer iceServers []conf.WebRTCICEServer
writeQueueSize int writeQueueSize int
pathManager *pathManager externalCmdPool *externalcmd.Pool
metrics *metrics pathManager *pathManager
parent webRTCManagerParent metrics *metrics
parent webRTCManagerParent
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
@ -318,6 +320,7 @@ func newWebRTCManager(
iceHostNAT1To1IPs []string, iceHostNAT1To1IPs []string,
iceUDPMuxAddress string, iceUDPMuxAddress string,
iceTCPMuxAddress string, iceTCPMuxAddress string,
externalCmdPool *externalcmd.Pool,
pathManager *pathManager, pathManager *pathManager,
metrics *metrics, metrics *metrics,
parent webRTCManagerParent, parent webRTCManagerParent,
@ -329,6 +332,7 @@ func newWebRTCManager(
trustedProxies: trustedProxies, trustedProxies: trustedProxies,
iceServers: iceServers, iceServers: iceServers,
writeQueueSize: writeQueueSize, writeQueueSize: writeQueueSize,
externalCmdPool: externalCmdPool,
pathManager: pathManager, pathManager: pathManager,
metrics: metrics, metrics: metrics,
parent: parent, parent: parent,
@ -440,6 +444,7 @@ outer:
m.api, m.api,
req, req,
&wg, &wg,
m.externalCmdPool,
m.pathManager, m.pathManager,
m, m,
) )

48
internal/core/webrtc_session.go

@ -17,6 +17,7 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc" "github.com/bluenviron/mediamtx/internal/webrtcpc"
) )
@ -176,12 +177,13 @@ type webRTCSessionPathManager interface {
} }
type webRTCSession struct { type webRTCSession struct {
writeQueueSize int writeQueueSize int
api *webrtc.API api *webrtc.API
req webRTCNewSessionReq req webRTCNewSessionReq
wg *sync.WaitGroup wg *sync.WaitGroup
pathManager webRTCSessionPathManager externalCmdPool *externalcmd.Pool
parent *webRTCManager pathManager webRTCSessionPathManager
parent *webRTCManager
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
@ -201,6 +203,7 @@ func newWebRTCSession(
api *webrtc.API, api *webrtc.API,
req webRTCNewSessionReq, req webRTCNewSessionReq,
wg *sync.WaitGroup, wg *sync.WaitGroup,
externalCmdPool *externalcmd.Pool,
pathManager webRTCSessionPathManager, pathManager webRTCSessionPathManager,
parent *webRTCManager, parent *webRTCManager,
) *webRTCSession { ) *webRTCSession {
@ -211,8 +214,9 @@ func newWebRTCSession(
api: api, api: api,
req: req, req: req,
wg: wg, wg: wg,
parent: parent, externalCmdPool: externalCmdPool,
pathManager: pathManager, pathManager: pathManager,
parent: parent,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
created: time.Now(), created: time.Now(),
@ -523,6 +527,36 @@ func (s *webRTCSession) runRead() (int, error) {
s.Log(logger.Info, "is reading from path '%s', %s", s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))
pathConf := res.path.safeConf()
if pathConf.RunOnRead != "" {
s.Log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
s.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
res.path.externalCmdEnv(),
func(err error) {
s.Log(logger.Info, "runOnRead command exited: %v", err)
})
defer func() {
onReadCmd.Close()
s.Log(logger.Info, "runOnRead command stopped")
}()
}
if pathConf.RunOnUnread != "" {
defer func() {
s.Log(logger.Info, "runOnUnread command launched")
externalcmd.NewCmd(
s.externalCmdPool,
pathConf.RunOnUnread,
false,
res.path.externalCmdEnv(),
nil)
}()
}
writer.Start() writer.Start()
select { select {

18
internal/externalcmd/cmd.go

@ -3,6 +3,7 @@ package externalcmd
import ( import (
"errors" "errors"
"fmt"
"strings" "strings"
"time" "time"
) )
@ -13,6 +14,9 @@ const (
var errTerminated = errors.New("terminated") var errTerminated = errors.New("terminated")
// OnExitFunc is the prototype of onExit.
type OnExitFunc func(error)
// Environment is a Cmd environment. // Environment is a Cmd environment.
type Environment map[string]string type Environment map[string]string
@ -34,7 +38,7 @@ func NewCmd(
cmdstr string, cmdstr string,
restart bool, restart bool,
env Environment, env Environment,
onExit func(error), onExit OnExitFunc,
) *Cmd { ) *Cmd {
// replace variables in both Linux and Windows, in order to allow using the // replace variables in both Linux and Windows, in order to allow using the
// same commands on both of them. // same commands on both of them.
@ -42,6 +46,10 @@ func NewCmd(
cmdstr = strings.ReplaceAll(cmdstr, "$"+key, val) cmdstr = strings.ReplaceAll(cmdstr, "$"+key, val)
} }
if onExit == nil {
onExit = func(_ error) {}
}
e := &Cmd{ e := &Cmd{
pool: pool, pool: pool,
cmdstr: cmdstr, cmdstr: cmdstr,
@ -72,13 +80,15 @@ func (e *Cmd) run() {
return return
} }
e.onExit(err)
if !e.restart { if !e.restart {
<-e.terminate if err != nil {
e.onExit(err)
}
return return
} }
e.onExit(fmt.Errorf("command exited with code 0"))
select { select {
case <-time.After(restartPause): case <-time.After(restartPause):
case <-e.terminate: case <-e.terminate:

5
internal/externalcmd/cmd_unix.go

@ -55,6 +55,9 @@ func (e *Cmd) runOSSpecific() error {
return errTerminated return errTerminated
case c := <-cmdDone: case c := <-cmdDone:
return fmt.Errorf("command returned code %d", c) if c != 0 {
return fmt.Errorf("command exited with code %d", c)
}
return nil
} }
} }

5
internal/externalcmd/cmd_win.go

@ -76,6 +76,9 @@ func (e *Cmd) runOSSpecific() error {
return errTerminated return errTerminated
case c := <-cmdDone: case c := <-cmdDone:
return fmt.Errorf("command returned code %d", c) if c != nil {
return fmt.Errorf("command exited with code %d", c)
}
return nil
} }
} }

18
mediamtx.yml

@ -53,13 +53,15 @@ pprof: no
pprofAddress: 127.0.0.1:9999 pprofAddress: 127.0.0.1:9999
# Command to run when a client connects to the server. # Command to run when a client connects to the server.
# Prepend ./ to run an executable in the current folder (example: "./ffmpeg")
# This is terminated with SIGINT when a client disconnects from the server. # This is terminated with SIGINT when a client disconnects from the server.
# The following environment variables are available: # The following environment variables are available:
# * RTSP_PORT: RTSP server port # * RTSP_PORT: RTSP server port
runOnConnect: runOnConnect:
# Restart the command if it exits. # Restart the command if it exits.
runOnConnectRestart: no runOnConnectRestart: no
# Command to run when a client disconnects from the server.
# Environment variables are the same of runOnConnect.
runOnDisconnect:
############################################### ###############################################
# RTSP settings # RTSP settings
@ -448,7 +450,6 @@ paths:
# Command to run when this path is initialized. # Command to run when this path is initialized.
# This can be used to publish a stream and keep it always opened. # This can be used to publish a stream and keep it always opened.
# Prepend ./ to run an executable in the current folder (example: "./ffmpeg")
# This is terminated with SIGINT when the program closes. # This is terminated with SIGINT when the program closes.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -461,7 +462,6 @@ paths:
# Command to run when this path is requested. # Command to run when this path is requested.
# This can be used to publish a stream on demand. # This can be used to publish a stream on demand.
# Prepend ./ to run an executable in the current folder (example: "./ffmpeg")
# This is terminated with SIGINT when the path is not requested anymore. # This is terminated with SIGINT when the path is not requested anymore.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -478,9 +478,8 @@ paths:
# readers connected and this amount of time has passed. # readers connected and this amount of time has passed.
runOnDemandCloseAfter: 10s runOnDemandCloseAfter: 10s
# Command to run when the stream is ready to be read, whether it is # Command to run when the stream is ready to be read, whenever it is
# published by a client or pulled from a server / camera. # published by a client or pulled from a server / camera.
# Prepend ./ to run an executable in the current folder (example: "./ffmpeg")
# This is terminated with SIGINT when the stream is not ready anymore. # This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -490,9 +489,11 @@ paths:
runOnReady: runOnReady:
# Restart the command if it exits. # Restart the command if it exits.
runOnReadyRestart: no runOnReadyRestart: no
# Command to run when the stream is not available anymore.
# Environment variables are the same of runOnReady.
runOnNotReady:
# Command to run when a clients starts reading. # Command to run when a client starts reading.
# Prepend ./ to run an executable in the current folder (example: "./ffmpeg")
# This is terminated with SIGINT when a client stops reading. # This is terminated with SIGINT when a client stops reading.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -502,3 +503,6 @@ paths:
runOnRead: runOnRead:
# Restart the command if it exits. # Restart the command if it exits.
runOnReadRestart: no runOnReadRestart: no
# Command to run when a client stops reading.
# Environment variables are the same of runOnRead.
runOnUnread:

Loading…
Cancel
Save