Browse Source

add runOnRecordSegmentComplete and rclone integration (#2404) (#2428)

pull/2437/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
eb975027b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      README.md
  2. 2
      apidocs/openapi.yaml
  3. 63
      internal/conf/path.go
  4. 14
      internal/core/path.go
  5. 35
      internal/record/agent.go
  6. 8
      internal/record/agent_test.go
  7. 1
      internal/record/cleaner.go
  8. 4
      internal/record/segment.go
  9. 21
      mediamtx.yml

51
README.md

@ -1158,6 +1158,34 @@ Currently the server supports recording tracks encoded with the following codecs @@ -1158,6 +1158,34 @@ Currently the server supports recording tracks encoded with the following codecs
* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG
* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3
To upload recordings to a remote location, you can use _MediaMTX_ together with [rclone](https://github.com/rclone/rclone), a command line tool that provides file synchronization capabilities with a huge variety of services (including S3, FTP, SMB, Google Drive):
1. Download and install [rclone](https://github.com/rclone/rclone).
2. Configure _rclone_:
```
rclone config
```
3. Place `rclone` into the `runOnInit` and `runOnRecordSegmentComplete` hooks:
```yml
record: yes
paths:
mypath:
# this is needed to sync segments after a crash.
# replace myconfig with the name of the rclone config.
runOnInit: rclone sync -v ./recordings myconfig:/my-path/recordings
# this is called when a segment has been finalized.
# replace myconfig with the name of the rclone config.
runOnRecordSegmentComplete: rclone sync -v --min-age=1ms ./recordings myconfig:/my-path/recordings
```
If you want to delete local segments after they are uploaded, replace `rclone sync` with `rclone move`.
### Forward streams to another server
To forward incoming streams to another server, use _FFmpeg_ inside the `runOnReady` parameter:
@ -1311,7 +1339,7 @@ paths: @@ -1311,7 +1339,7 @@ paths:
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnReady: curl http://my-custom-server/webhook?source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
runOnReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
# Restart the command if it exits.
runOnReadyRestart: no
```
@ -1322,7 +1350,7 @@ paths: @@ -1322,7 +1350,7 @@ paths:
paths:
mypath:
# Environment variables are the same of runOnReady.
runOnNotReady: curl http://my-custom-server/webhook?source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
```
`runOnRead` allows to run a command when a client starts reading:
@ -1338,7 +1366,7 @@ paths: @@ -1338,7 +1366,7 @@ paths:
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
runOnRead: curl http://my-custom-server/webhook?reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
runOnRead: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
# Restart the command if it exits.
runOnReadRestart: no
```
@ -1350,7 +1378,22 @@ paths: @@ -1350,7 +1378,22 @@ paths:
mypath:
# Command to run when a client stops reading.
# Environment variables are the same of runOnRead.
runOnUnread: curl http://my-custom-server/webhook?reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
runOnUnread: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
```
`runOnRecordSegmentComplete` allows to run a command when a record segment is complete:
```yml
paths:
mypath:
# Command to run when a record segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
```
### API

2
apidocs/openapi.yaml

@ -349,6 +349,8 @@ components: @@ -349,6 +349,8 @@ components:
type: boolean
runOnUnread:
type: string
runOnRecordSegmentComplete:
type: string
Path:
type: object

63
internal/conf/path.go

@ -120,18 +120,19 @@ type PathConf struct { @@ -120,18 +120,19 @@ type PathConf struct {
RPICameraTextOverlay string `json:"rpiCameraTextOverlay"`
// Hooks
RunOnInit string `json:"runOnInit"`
RunOnInitRestart bool `json:"runOnInitRestart"`
RunOnDemand string `json:"runOnDemand"`
RunOnDemandRestart bool `json:"runOnDemandRestart"`
RunOnDemandStartTimeout StringDuration `json:"runOnDemandStartTimeout"`
RunOnDemandCloseAfter StringDuration `json:"runOnDemandCloseAfter"`
RunOnReady string `json:"runOnReady"`
RunOnReadyRestart bool `json:"runOnReadyRestart"`
RunOnNotReady string `json:"runOnNotReady"`
RunOnRead string `json:"runOnRead"`
RunOnReadRestart bool `json:"runOnReadRestart"`
RunOnUnread string `json:"runOnUnread"`
RunOnInit string `json:"runOnInit"`
RunOnInitRestart bool `json:"runOnInitRestart"`
RunOnDemand string `json:"runOnDemand"`
RunOnDemandRestart bool `json:"runOnDemandRestart"`
RunOnDemandStartTimeout StringDuration `json:"runOnDemandStartTimeout"`
RunOnDemandCloseAfter StringDuration `json:"runOnDemandCloseAfter"`
RunOnReady string `json:"runOnReady"`
RunOnReadyRestart bool `json:"runOnReadyRestart"`
RunOnNotReady string `json:"runOnNotReady"`
RunOnRead string `json:"runOnRead"`
RunOnReadRestart bool `json:"runOnReadRestart"`
RunOnUnread string `json:"runOnUnread"`
RunOnRecordSegmentComplete string `json:"runOnRecordSegmentComplete"`
}
func (pconf *PathConf) check(conf *Conf, name string) error {
@ -319,6 +320,12 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -319,6 +320,12 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
return fmt.Errorf("'sourceOnDemand' is useless when source is 'publisher'")
}
}
if pconf.SRTReadPassphrase != "" {
err := srtCheckPassphrase(pconf.SRTReadPassphrase)
if err != nil {
return fmt.Errorf("invalid 'readRTPassphrase': %v", err)
}
}
// Publisher
@ -326,6 +333,10 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -326,6 +333,10 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
pconf.OverridePublisher = true
}
if pconf.Fallback != "" {
if pconf.Source != "publisher" {
return fmt.Errorf("'fallback' can only be used when source is 'publisher'")
}
if strings.HasPrefix(pconf.Fallback, "/") {
err := IsValidPathName(pconf.Fallback[1:])
if err != nil {
@ -338,6 +349,16 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -338,6 +349,16 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
}
}
}
if pconf.SRTPublishPassphrase != "" {
if pconf.Source != "publisher" {
return fmt.Errorf("'srtPublishPassphase' can only be used when source is 'publisher'")
}
err := srtCheckPassphrase(pconf.SRTPublishPassphrase)
if err != nil {
return fmt.Errorf("invalid 'srtPublishPassphrase': %v", err)
}
}
// Authentication
@ -374,25 +395,11 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -374,25 +395,11 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
}
}
// SRT
if pconf.SRTPublishPassphrase != "" {
err := srtCheckPassphrase(pconf.SRTPublishPassphrase)
if err != nil {
return fmt.Errorf("invalid 'srtPublishPassphrase': %v", err)
}
}
if pconf.SRTReadPassphrase != "" {
err := srtCheckPassphrase(pconf.SRTReadPassphrase)
if err != nil {
return fmt.Errorf("invalid 'readRTPassphrase': %v", err)
}
}
// Hooks
if pconf.RunOnInit != "" && pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path")
return fmt.Errorf("a path with a regular expression (or path 'all')" +
" does not support option 'runOnInit'; use another path")
}
if pconf.RunOnDemand != "" && pconf.Source != "publisher" {
return fmt.Errorf("'runOnDemand' can be used only when source is 'publisher'")

14
internal/core/path.go

@ -975,6 +975,20 @@ func (pa *path) startRecording() { @@ -975,6 +975,20 @@ func (pa *path) startRecording() {
time.Duration(pa.recordSegmentDuration),
pa.name,
pa.stream,
func(segmentPath string) {
if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
pa.Log(logger.Info, "runOnRecordSegmentComplete command launched")
externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnRecordSegmentComplete,
false,
env,
nil)
}
},
pa,
)
}

35
internal/record/agent.go

@ -4,7 +4,6 @@ import ( @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"path/filepath"
"strings"
"time"
@ -108,11 +107,12 @@ type sample struct { @@ -108,11 +107,12 @@ type sample struct {
// Agent saves streams on disk.
type Agent struct {
path string
partDuration time.Duration
segmentDuration time.Duration
stream *stream.Stream
parent logger.Writer
path string
partDuration time.Duration
segmentDuration time.Duration
stream *stream.Stream
onSegmentComplete func(string)
parent logger.Writer
ctx context.Context
ctxCancel func()
@ -132,23 +132,28 @@ func NewAgent( @@ -132,23 +132,28 @@ func NewAgent(
segmentDuration time.Duration,
pathName string,
stream *stream.Stream,
onSegmentComplete func(string),
parent logger.Writer,
) *Agent {
recordPath, _ = filepath.Abs(recordPath)
recordPath = strings.ReplaceAll(recordPath, "%path", pathName)
recordPath += ".mp4"
if onSegmentComplete == nil {
onSegmentComplete = func(_ string) {}
}
ctx, ctxCancel := context.WithCancel(context.Background())
r := &Agent{
path: recordPath,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
done: make(chan struct{}),
path: recordPath,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
onSegmentComplete: onSegmentComplete,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
done: make(chan struct{}),
}
r.writer = asyncwriter.New(writeQueueSize, r)

8
internal/record/agent_test.go

@ -77,6 +77,8 @@ func TestAgent(t *testing.T) { @@ -77,6 +77,8 @@ func TestAgent(t *testing.T) {
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
segDone := make(chan struct{}, 2)
a := NewAgent(
1024,
recordPath,
@ -84,6 +86,9 @@ func TestAgent(t *testing.T) { @@ -84,6 +86,9 @@ func TestAgent(t *testing.T) {
1*time.Second,
"mypath",
stream,
func(fpath string) {
segDone <- struct{}{}
},
&nilLogger{},
)
@ -140,7 +145,8 @@ func TestAgent(t *testing.T) { @@ -140,7 +145,8 @@ func TestAgent(t *testing.T) {
})
}
time.Sleep(500 * time.Millisecond)
<-segDone
<-segDone
a.Close()
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"))

1
internal/record/cleaner.go

@ -55,7 +55,6 @@ func NewCleaner( @@ -55,7 +55,6 @@ func NewCleaner(
deleteAfter time.Duration,
parent logger.Writer,
) *Cleaner {
recordPath, _ = filepath.Abs(recordPath)
recordPath += ".mp4"
ctx, ctxCancel := context.WithCancel(context.Background())

4
internal/record/segment.go

@ -65,6 +65,10 @@ func (s *segment) close() error { @@ -65,6 +65,10 @@ func (s *segment) close() error {
if err == nil {
err = err2
}
if err2 == nil {
s.r.onSegmentComplete(s.fpath)
}
}
return err

21
mediamtx.yml

@ -249,7 +249,7 @@ recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f @@ -249,7 +249,7 @@ recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f
# Format of recorded segments.
# Currently the only available format is fmp4 (fragmented MP4).
recordFormat: fmp4
# fMP4 files are concatenation of small MP4 files (parts), each with this duration.
# fMP4 segments are concatenation of small MP4 files (parts), each with this duration.
# When a system failure occurs, the last part gets lost.
# Therefore, the part duration is equal to the RPO (recovery point objective).
recordPartDuration: 100ms
@ -337,7 +337,7 @@ paths: @@ -337,7 +337,7 @@ paths:
# allow another client to disconnect the current publisher and publish in its place.
overridePublisher: yes
# if no one is publishing, redirect readers to this path.
# It can be can be a relative path (i.e. /otherstream) or an absolute RTSP URL.
# It can be can be a relative path (i.e. /otherstream) or an absolute RTSP URL.
fallback:
# SRT encryption passphrase required to publish to this path
srtPublishPassphrase:
@ -488,11 +488,11 @@ paths: @@ -488,11 +488,11 @@ paths:
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
runOnReady:
# Restart the command if it exits.
runOnReadyRestart: no
@ -504,14 +504,23 @@ paths: @@ -504,14 +504,23 @@ paths:
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
runOnRead:
# Restart the command if it exits.
runOnReadRestart: no
# Command to run when a client stops reading.
# Environment variables are the same of runOnRead.
runOnUnread:
# Command to run when a record segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentComplete:

Loading…
Cancel
Save