Browse Source

add playback server (#2452) (#2906)

* add playback server

* add playback switch

* update readme
pull/2935/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
57c2d5aecb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 40
      README.md
  2. 20
      apidocs/openapi.yaml
  3. 3
      go.mod
  4. 6
      go.sum
  5. 14
      internal/api/api.go
  6. 35
      internal/conf/conf.go
  7. 1
      internal/conf/conf_test.go
  8. 40
      internal/conf/credential.go
  9. 4
      internal/conf/credential_test.go
  10. 50
      internal/conf/path.go
  11. 65
      internal/core/core.go
  12. 77
      internal/core/path.go
  13. 183
      internal/core/path_manager.go
  14. 10
      internal/defs/path.go
  15. 2
      internal/defs/path_manager.go
  16. 410
      internal/playback/fmp4.go
  17. 79
      internal/playback/fmp4_test.go
  18. 299
      internal/playback/server.go
  19. 228
      internal/playback/server_test.go
  20. 13
      internal/record/agent_instance.go
  21. 50
      internal/record/cleaner.go
  22. 2
      internal/record/cleaner_test.go
  23. 10
      internal/record/format_fmp4_part.go
  24. 8
      internal/record/format_fmp4_segment.go
  25. 2
      internal/record/format_mpegts_segment.go
  26. 54
      internal/record/path.go
  27. 12
      internal/record/path_test.go
  28. 2
      internal/servers/hls/http_server.go
  29. 2
      internal/servers/webrtc/http_server.go
  30. 35
      mediamtx.yml

40
README.md

@ -56,6 +56,7 @@ And can be recorded with:
* Streams are automatically converted from a protocol to another * Streams are automatically converted from a protocol to another
* Serve multiple streams at once in separate paths * Serve multiple streams at once in separate paths
* Record streams to disk * Record streams to disk
* Playback recordings
* Authenticate users; use internal or external authentication * Authenticate users; use internal or external authentication
* Redirect readers to other RTSP servers (load balancing) * Redirect readers to other RTSP servers (load balancing)
* Query and control the server through the API * Query and control the server through the API
@ -115,6 +116,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Encrypt the configuration](#encrypt-the-configuration) * [Encrypt the configuration](#encrypt-the-configuration)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression) * [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [Record streams to disk](#record-streams-to-disk) * [Record streams to disk](#record-streams-to-disk)
* [Playback recordings](#playback-recordings)
* [Forward streams to other servers](#forward-streams-to-other-servers) * [Forward streams to other servers](#forward-streams-to-other-servers)
* [Proxy requests to other servers](#proxy-requests-to-other-servers) * [Proxy requests to other servers](#proxy-requests-to-other-servers)
* [On-demand publishing](#on-demand-publishing) * [On-demand publishing](#on-demand-publishing)
@ -1183,6 +1185,44 @@ To upload recordings to a remote location, you can use _MediaMTX_ together with
If you want to delete local segments after they are uploaded, replace `rclone sync` with `rclone move`. If you want to delete local segments after they are uploaded, replace `rclone sync` with `rclone move`.
### Playback recordings
Recordings can be served to users through a dedicated HTTP server, that can be enabled inside the configuration:
```yml
playback: yes
playbackAddress: :9996
```
The server can be queried for recordings by using the URL:
```
http://localhost:9996/get?path=[mypath]&start=[start_date]&duration=[duration]&format=[format]
```
Where:
* [mypath] is the path name
* [start_date] is the start date in RFC3339 format
* [duration] is the maximum duration of the recording in Golang format (example: 20s, 20h)
* [format] must be fmp4
All parameters must be [url-encoded](https://www.urlencoder.org/).
For instance:
```
http://localhost:9996/get?path=stream2&start=2024-01-14T16%3A33%3A17%2B00%3A00&duration=200s&format=fmp4
```
The resulting stream is natively compatible with any browser, therefore its URL can be directly inserted into a \<video> tag:
```html
<video controls>
<source src="http://localhost:9996/get?path=stream2&start=2024-01-14T16%3A33%3A17%2B00%3A00&duration=200s&format=fmp4" type="video/mp4" />
</video>
```
### Forward streams to other servers ### Forward streams to other servers
To forward incoming streams to another server, use _FFmpeg_ inside the `runOnReady` parameter: To forward incoming streams to another server, use _FFmpeg_ inside the `runOnReady` parameter:

20
apidocs/openapi.yaml

@ -43,10 +43,6 @@ components:
type: integer type: integer
externalAuthenticationURL: externalAuthenticationURL:
type: string type: string
api:
type: boolean
apiAddress:
type: string
metrics: metrics:
type: boolean type: boolean
metricsAddress: metricsAddress:
@ -62,6 +58,18 @@ components:
runOnDisconnect: runOnDisconnect:
type: string type: string
# API
api:
type: boolean
apiAddress:
type: string
# Playback server
playback:
type: boolean
playbackAddress:
type: string
# RTSP server # RTSP server
rtsp: rtsp:
type: boolean type: boolean
@ -213,9 +221,11 @@ components:
fallback: fallback:
type: string type: string
# Record # Record and playback
record: record:
type: boolean type: boolean
playback:
type: boolean
recordPath: recordPath:
type: string type: string
recordFormat: recordFormat:

3
go.mod

@ -6,8 +6,7 @@ require (
code.cloudfoundry.org/bytefmt v0.0.0 code.cloudfoundry.org/bytefmt v0.0.0
github.com/abema/go-mp4 v1.2.0 github.com/abema/go-mp4 v1.2.0
github.com/alecthomas/kong v0.8.1 github.com/alecthomas/kong v0.8.1
github.com/aler9/writerseeker v1.1.0 github.com/bluenviron/gohlslib v1.2.1-0.20240114214154-83fc88edbaad
github.com/bluenviron/gohlslib v1.2.0
github.com/bluenviron/gortsplib/v4 v4.7.1 github.com/bluenviron/gortsplib/v4 v4.7.1
github.com/bluenviron/mediacommon v1.9.0 github.com/bluenviron/mediacommon v1.9.0
github.com/datarhei/gosrt v0.5.7 github.com/datarhei/gosrt v0.5.7

6
go.sum

@ -12,16 +12,14 @@ github.com/aler9/sdp/v3 v3.0.0-20231022165400-33437e07f326 h1:HA7u47vkcxFiHtiOjm
github.com/aler9/sdp/v3 v3.0.0-20231022165400-33437e07f326/go.mod h1:I40uD/ZSmK2peI6AdJga5fd55d4bFK0oWOgLS9Q8sVc= github.com/aler9/sdp/v3 v3.0.0-20231022165400-33437e07f326/go.mod h1:I40uD/ZSmK2peI6AdJga5fd55d4bFK0oWOgLS9Q8sVc=
github.com/aler9/webrtc/v3 v3.0.0-20231112223655-e402ed2689c6 h1:wMd3D1mLghoYYh31STig8Kwm2qi8QyQKUy09qUUZrVw= github.com/aler9/webrtc/v3 v3.0.0-20231112223655-e402ed2689c6 h1:wMd3D1mLghoYYh31STig8Kwm2qi8QyQKUy09qUUZrVw=
github.com/aler9/webrtc/v3 v3.0.0-20231112223655-e402ed2689c6/go.mod h1:1CaT2fcZzZ6VZA+O1i9yK2DU4EOcXVvSbWG9pr5jefs= github.com/aler9/webrtc/v3 v3.0.0-20231112223655-e402ed2689c6/go.mod h1:1CaT2fcZzZ6VZA+O1i9yK2DU4EOcXVvSbWG9pr5jefs=
github.com/aler9/writerseeker v1.1.0 h1:t+Sm3tjp8scNlqyoa8obpeqwciMNOvdvsxjxEb3Sx3g=
github.com/aler9/writerseeker v1.1.0/go.mod h1:QNCcjSKnLsYoTfMmXkEEfgbz6nNXWxKSaBY+hGJGWDA=
github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA=
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c= github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c=
github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4=
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI=
github.com/bluenviron/gohlslib v1.2.0 h1:Hrx2/n/AcmKKIV+MjZLKs5kmW+O7xCdUSPJQoS39JKw= github.com/bluenviron/gohlslib v1.2.1-0.20240114214154-83fc88edbaad h1:R9Lqf0A2/3TTB4casoU1LC+HRLmsVxNYUTmnbbD8WAE=
github.com/bluenviron/gohlslib v1.2.0/go.mod h1:kG/Sjebsxnf5asMGaGcQ0aSvtFGNChJPgctds2wDHOI= github.com/bluenviron/gohlslib v1.2.1-0.20240114214154-83fc88edbaad/go.mod h1:k94WhiVkgJl45Q1WkLw8/GG2AJ1+VU9c/3i4f41xMq8=
github.com/bluenviron/gortsplib/v4 v4.7.1 h1:ZiPHjnIsdPDfPGZgfBr2n2xCFZlvmc/5zEqdoJUa1vU= github.com/bluenviron/gortsplib/v4 v4.7.1 h1:ZiPHjnIsdPDfPGZgfBr2n2xCFZlvmc/5zEqdoJUa1vU=
github.com/bluenviron/gortsplib/v4 v4.7.1/go.mod h1:3+IYh85PgIPLHr4D5z7GnRvpu/ogSHMDhsYW/CjrD8E= github.com/bluenviron/gortsplib/v4 v4.7.1/go.mod h1:3+IYh85PgIPLHr4D5z7GnRvpu/ogSHMDhsYW/CjrD8E=
github.com/bluenviron/mediacommon v1.9.0 h1:0I7PuwaDD6uOeQlV3WOlC/7FFESDa4dllYylj1YcnI4= github.com/bluenviron/mediacommon v1.9.0 h1:0I7PuwaDD6uOeQlV3WOlC/7FFESDa4dllYylj1YcnI4=

14
internal/api/api.go

@ -274,7 +274,7 @@ func (a *API) writeError(ctx *gin.Context, status int, err error) {
// show error in logs // show error in logs
a.Log(logger.Error, err.Error()) a.Log(logger.Error, err.Error())
// send error in response // add error to response
ctx.JSON(status, &defs.APIError{ ctx.JSON(status, &defs.APIError{
Error: err.Error(), Error: err.Error(),
}) })
@ -303,7 +303,7 @@ func (a *API) onConfigGlobalPatch(ctx *gin.Context) {
newConf.PatchGlobal(&c) newConf.PatchGlobal(&c)
err = newConf.Check() err = newConf.Validate()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
@ -341,7 +341,7 @@ func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) {
newConf.PatchPathDefaults(&p) newConf.PatchPathDefaults(&p)
err = newConf.Check() err = newConf.Validate()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
@ -422,7 +422,7 @@ func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl
return return
} }
err = newConf.Check() err = newConf.Validate()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
@ -463,7 +463,7 @@ func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl
return return
} }
err = newConf.Check() err = newConf.Validate()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
@ -504,7 +504,7 @@ func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl
return return
} }
err = newConf.Check() err = newConf.Validate()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
@ -538,7 +538,7 @@ func (a *API) onConfigPathsDelete(ctx *gin.Context) {
return return
} }
err = newConf.Check() err = newConf.Validate()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return

35
internal/conf/conf.go

@ -94,8 +94,6 @@ type Conf struct {
WriteQueueSize int `json:"writeQueueSize"` WriteQueueSize int `json:"writeQueueSize"`
UDPMaxPayloadSize int `json:"udpMaxPayloadSize"` UDPMaxPayloadSize int `json:"udpMaxPayloadSize"`
ExternalAuthenticationURL string `json:"externalAuthenticationURL"` ExternalAuthenticationURL string `json:"externalAuthenticationURL"`
API bool `json:"api"`
APIAddress string `json:"apiAddress"`
Metrics bool `json:"metrics"` Metrics bool `json:"metrics"`
MetricsAddress string `json:"metricsAddress"` MetricsAddress string `json:"metricsAddress"`
PPROF bool `json:"pprof"` PPROF bool `json:"pprof"`
@ -104,6 +102,14 @@ type Conf struct {
RunOnConnectRestart bool `json:"runOnConnectRestart"` RunOnConnectRestart bool `json:"runOnConnectRestart"`
RunOnDisconnect string `json:"runOnDisconnect"` RunOnDisconnect string `json:"runOnDisconnect"`
// API
API bool `json:"api"`
APIAddress string `json:"apiAddress"`
// Playback
Playback bool `json:"playback"`
PlaybackAddress string `json:"playbackAddress"`
// RTSP server // RTSP server
RTSP bool `json:"rtsp"` RTSP bool `json:"rtsp"`
RTSPDisable *bool `json:"rtspDisable,omitempty"` // deprecated RTSPDisable *bool `json:"rtspDisable,omitempty"` // deprecated
@ -195,11 +201,16 @@ func (conf *Conf) setDefaults() {
conf.WriteTimeout = 10 * StringDuration(time.Second) conf.WriteTimeout = 10 * StringDuration(time.Second)
conf.WriteQueueSize = 512 conf.WriteQueueSize = 512
conf.UDPMaxPayloadSize = 1472 conf.UDPMaxPayloadSize = 1472
conf.APIAddress = "127.0.0.1:9997"
conf.MetricsAddress = "127.0.0.1:9998" conf.MetricsAddress = "127.0.0.1:9998"
conf.PPROFAddress = "127.0.0.1:9999" conf.PPROFAddress = "127.0.0.1:9999"
// RTSP // API
conf.APIAddress = "127.0.0.1:9997"
// Playback server
conf.PlaybackAddress = ":9996"
// RTSP server
conf.RTSP = true conf.RTSP = true
conf.Protocols = Protocols{ conf.Protocols = Protocols{
Protocol(gortsplib.TransportUDP): {}, Protocol(gortsplib.TransportUDP): {},
@ -217,7 +228,7 @@ func (conf *Conf) setDefaults() {
conf.ServerCert = "server.crt" conf.ServerCert = "server.crt"
conf.AuthMethods = AuthMethods{headers.AuthBasic} conf.AuthMethods = AuthMethods{headers.AuthBasic}
// RTMP // RTMP server
conf.RTMP = true conf.RTMP = true
conf.RTMPAddress = ":1935" conf.RTMPAddress = ":1935"
conf.RTMPSAddress = ":1936" conf.RTMPSAddress = ":1936"
@ -236,7 +247,7 @@ func (conf *Conf) setDefaults() {
conf.HLSSegmentMaxSize = 50 * 1024 * 1024 conf.HLSSegmentMaxSize = 50 * 1024 * 1024
conf.HLSAllowOrigin = "*" conf.HLSAllowOrigin = "*"
// WebRTC // WebRTC server
conf.WebRTC = true conf.WebRTC = true
conf.WebRTCAddress = ":8889" conf.WebRTCAddress = ":8889"
conf.WebRTCServerKey = "server.key" conf.WebRTCServerKey = "server.key"
@ -248,7 +259,7 @@ func (conf *Conf) setDefaults() {
conf.WebRTCAdditionalHosts = []string{} conf.WebRTCAdditionalHosts = []string{}
conf.WebRTCICEServers2 = []WebRTCICEServer{} conf.WebRTCICEServers2 = []WebRTCICEServer{}
// SRT // SRT server
conf.SRT = true conf.SRT = true
conf.SRTAddress = ":8890" conf.SRTAddress = ":8890"
@ -274,7 +285,7 @@ func Load(fpath string, defaultConfPaths []string) (*Conf, string, error) {
return nil, "", err return nil, "", err
} }
err = conf.Check() err = conf.Validate()
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
@ -337,8 +348,8 @@ func (conf Conf) Clone() *Conf {
return &dest return &dest
} }
// Check checks the configuration for errors. // Validate checks the configuration for errors.
func (conf *Conf) Check() error { func (conf *Conf) Validate() error {
// General // General
if conf.ReadBufferCount != nil { if conf.ReadBufferCount != nil {
@ -436,7 +447,7 @@ func (conf *Conf) Check() error {
} }
} }
// Record // Record (deprecated)
if conf.Record != nil { if conf.Record != nil {
conf.PathDefaults.Record = *conf.Record conf.PathDefaults.Record = *conf.Record
} }
@ -479,7 +490,7 @@ func (conf *Conf) Check() error {
pconf := newPath(&conf.PathDefaults, optional) pconf := newPath(&conf.PathDefaults, optional)
conf.Paths[name] = pconf conf.Paths[name] = pconf
err := pconf.check(conf, name) err := pconf.validate(conf, name)
if err != nil { if err != nil {
return err return err
} }

1
internal/conf/conf_test.go

@ -52,6 +52,7 @@ func TestConfFromFile(t *testing.T) {
Source: "publisher", Source: "publisher",
SourceOnDemandStartTimeout: 10 * StringDuration(time.Second), SourceOnDemandStartTimeout: 10 * StringDuration(time.Second),
SourceOnDemandCloseAfter: 10 * StringDuration(time.Second), SourceOnDemandCloseAfter: 10 * StringDuration(time.Second),
Playback: true,
RecordPath: "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f", RecordPath: "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f",
RecordFormat: RecordFormatFMP4, RecordFormat: RecordFormatFMP4,
RecordPartDuration: 100000000, RecordPartDuration: 100000000,

40
internal/conf/credential.go

@ -39,7 +39,7 @@ func (d *Credential) UnmarshalJSON(b []byte) error {
value: in, value: in,
} }
return d.validateConfig() return d.validate()
} }
// UnmarshalEnv implements env.Unmarshaler. // UnmarshalEnv implements env.Unmarshaler.
@ -97,26 +97,24 @@ func (d *Credential) Check(guess string) bool {
return d.value == guess return d.value == guess
} }
func (d *Credential) validateConfig() error { func (d *Credential) validate() error {
if d.IsEmpty() { if !d.IsEmpty() {
return nil switch {
} case d.IsSha256():
if !reBase64.MatchString(d.value) {
switch { return fmt.Errorf("credential contains unsupported characters, sha256 hash must be base64 encoded")
case d.IsSha256(): }
if !reBase64.MatchString(d.value) { case d.IsArgon2():
return fmt.Errorf("credential contains unsupported characters, sha256 hash must be base64 encoded") // TODO: remove matthewhartstonge/argon2 when this PR gets merged into mainline Go:
} // https://go-review.googlesource.com/c/crypto/+/502515
case d.IsArgon2(): _, err := argon2.Decode([]byte(d.value[len("argon2:"):]))
// TODO: remove matthewhartstonge/argon2 when this PR gets merged into mainline Go: if err != nil {
// https://go-review.googlesource.com/c/crypto/+/502515 return fmt.Errorf("invalid argon2 hash: %w", err)
_, err := argon2.Decode([]byte(d.value[len("argon2:"):])) }
if err != nil { default:
return fmt.Errorf("invalid argon2 hash: %w", err) if !rePlainCredential.MatchString(d.value) {
} return fmt.Errorf("credential contains unsupported characters. Supported are: %s", plainCredentialSupportedChars)
default: }
if !rePlainCredential.MatchString(d.value) {
return fmt.Errorf("credential contains unsupported characters. Supported are: %s", plainCredentialSupportedChars)
} }
} }
return nil return nil

4
internal/conf/credential_test.go

@ -102,7 +102,7 @@ func TestCredential(t *testing.T) {
assert.False(t, cred.Check("notestuser")) assert.False(t, cred.Check("notestuser"))
}) })
t.Run("validateConfig", func(t *testing.T) { t.Run("validate", func(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
cred *Credential cred *Credential
@ -155,7 +155,7 @@ func TestCredential(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
err := tt.cred.validateConfig() err := tt.cred.validate()
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) assert.Error(t, err)
} else { } else {

50
internal/conf/path.go

@ -16,8 +16,7 @@ import (
var rePathName = regexp.MustCompile(`^[0-9a-zA-Z_\-/\.~]+$`) var rePathName = regexp.MustCompile(`^[0-9a-zA-Z_\-/\.~]+$`)
// IsValidPathName checks if a path name is valid. func isValidPathName(name string) error {
func IsValidPathName(name string) error {
if name == "" { if name == "" {
return fmt.Errorf("cannot be empty") return fmt.Errorf("cannot be empty")
} }
@ -47,6 +46,41 @@ func srtCheckPassphrase(passphrase string) error {
} }
} }
// FindPathConf returns the configuration corresponding to the given path name.
func FindPathConf(pathConfs map[string]*Path, name string) (string, *Path, []string, error) {
err := isValidPathName(name)
if err != nil {
return "", nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name)
}
// normal path
if pathConf, ok := pathConfs[name]; ok {
return name, pathConf, nil, nil
}
// regular expression-based path
for pathConfName, pathConf := range pathConfs {
if pathConf.Regexp != nil && pathConfName != "all" && pathConfName != "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
}
}
}
// all_others
for pathConfName, pathConf := range pathConfs {
if pathConfName == "all" || pathConfName == "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
}
}
}
return "", nil, nil, fmt.Errorf("path '%s' is not configured", name)
}
// Path is a path configuration. // Path is a path configuration.
type Path struct { type Path struct {
Regexp *regexp.Regexp `json:"-"` // filled by Check() Regexp *regexp.Regexp `json:"-"` // filled by Check()
@ -62,8 +96,9 @@ type Path struct {
SRTReadPassphrase string `json:"srtReadPassphrase"` SRTReadPassphrase string `json:"srtReadPassphrase"`
Fallback string `json:"fallback"` Fallback string `json:"fallback"`
// Record // Record and playback
Record bool `json:"record"` Record bool `json:"record"`
Playback bool `json:"playback"`
RecordPath string `json:"recordPath"` RecordPath string `json:"recordPath"`
RecordFormat RecordFormat `json:"recordFormat"` RecordFormat RecordFormat `json:"recordFormat"`
RecordPartDuration StringDuration `json:"recordPartDuration"` RecordPartDuration StringDuration `json:"recordPartDuration"`
@ -152,7 +187,8 @@ func (pconf *Path) setDefaults() {
pconf.SourceOnDemandStartTimeout = 10 * StringDuration(time.Second) pconf.SourceOnDemandStartTimeout = 10 * StringDuration(time.Second)
pconf.SourceOnDemandCloseAfter = 10 * StringDuration(time.Second) pconf.SourceOnDemandCloseAfter = 10 * StringDuration(time.Second)
// Record // Record and playback
pconf.Playback = true
pconf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f" pconf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f"
pconf.RecordFormat = RecordFormatFMP4 pconf.RecordFormat = RecordFormatFMP4
pconf.RecordPartDuration = 100 * StringDuration(time.Millisecond) pconf.RecordPartDuration = 100 * StringDuration(time.Millisecond)
@ -212,7 +248,7 @@ func (pconf Path) Clone() *Path {
return &dest return &dest
} }
func (pconf *Path) check(conf *Conf, name string) error { func (pconf *Path) validate(conf *Conf, name string) error {
pconf.Name = name pconf.Name = name
switch { switch {
@ -220,7 +256,7 @@ func (pconf *Path) check(conf *Conf, name string) error {
pconf.Regexp = regexp.MustCompile("^.*$") pconf.Regexp = regexp.MustCompile("^.*$")
case name == "" || name[0] != '~': // normal path case name == "" || name[0] != '~': // normal path
err := IsValidPathName(name) err := isValidPathName(name)
if err != nil { if err != nil {
return fmt.Errorf("invalid path name '%s': %w", name, err) return fmt.Errorf("invalid path name '%s': %w", name, err)
} }
@ -325,7 +361,7 @@ func (pconf *Path) check(conf *Conf, name string) error {
} }
if pconf.Fallback != "" { if pconf.Fallback != "" {
if strings.HasPrefix(pconf.Fallback, "/") { if strings.HasPrefix(pconf.Fallback, "/") {
err := IsValidPathName(pconf.Fallback[1:]) err := isValidPathName(pconf.Fallback[1:])
if err != nil { if err != nil {
return fmt.Errorf("'%s': %w", pconf.Fallback, err) return fmt.Errorf("'%s': %w", pconf.Fallback, err)
} }

65
internal/core/core.go

@ -22,6 +22,7 @@ import (
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/metrics" "github.com/bluenviron/mediamtx/internal/metrics"
"github.com/bluenviron/mediamtx/internal/playback"
"github.com/bluenviron/mediamtx/internal/pprof" "github.com/bluenviron/mediamtx/internal/pprof"
"github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/rlimit" "github.com/bluenviron/mediamtx/internal/rlimit"
@ -48,7 +49,7 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
for _, pa := range paths { for _, pa := range paths {
if pa.Record && pa.RecordDeleteAfter != 0 { if pa.Record && pa.RecordDeleteAfter != 0 {
entry := record.CleanerEntry{ entry := record.CleanerEntry{
PathFormat: pa.RecordPath, Path: pa.RecordPath,
Format: pa.RecordFormat, Format: pa.RecordFormat,
DeleteAfter: time.Duration(pa.RecordDeleteAfter), DeleteAfter: time.Duration(pa.RecordDeleteAfter),
} }
@ -65,8 +66,8 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
} }
sort.Slice(out2, func(i, j int) bool { sort.Slice(out2, func(i, j int) bool {
if out2[i].PathFormat != out2[j].PathFormat { if out2[i].Path != out2[j].Path {
return out2[i].PathFormat < out2[j].PathFormat return out2[i].Path < out2[j].Path
} }
return out2[i].DeleteAfter < out2[j].DeleteAfter return out2[i].DeleteAfter < out2[j].DeleteAfter
}) })
@ -90,6 +91,7 @@ type Core struct {
metrics *metrics.Metrics metrics *metrics.Metrics
pprof *pprof.PPROF pprof *pprof.PPROF
recordCleaner *record.Cleaner recordCleaner *record.Cleaner
playbackServer *playback.Server
pathManager *pathManager pathManager *pathManager
rtspServer *rtsp.Server rtspServer *rtsp.Server
rtspsServer *rtsp.Server rtspsServer *rtsp.Server
@ -312,20 +314,35 @@ func (p *Core) createResources(initial bool) error {
p.recordCleaner.Initialize() p.recordCleaner.Initialize()
} }
if p.conf.Playback &&
p.playbackServer == nil {
p.playbackServer = &playback.Server{
Address: p.conf.PlaybackAddress,
ReadTimeout: p.conf.ReadTimeout,
PathConfs: p.conf.Paths,
Parent: p,
}
err := p.playbackServer.Initialize()
if err != nil {
return err
}
}
if p.pathManager == nil { if p.pathManager == nil {
p.pathManager = newPathManager( p.pathManager = &pathManager{
p.conf.LogLevel, logLevel: p.conf.LogLevel,
p.conf.ExternalAuthenticationURL, externalAuthenticationURL: p.conf.ExternalAuthenticationURL,
p.conf.RTSPAddress, rtspAddress: p.conf.RTSPAddress,
p.conf.AuthMethods, authMethods: p.conf.AuthMethods,
p.conf.ReadTimeout, readTimeout: p.conf.ReadTimeout,
p.conf.WriteTimeout, writeTimeout: p.conf.WriteTimeout,
p.conf.WriteQueueSize, writeQueueSize: p.conf.WriteQueueSize,
p.conf.UDPMaxPayloadSize, udpMaxPayloadSize: p.conf.UDPMaxPayloadSize,
p.conf.Paths, pathConfs: p.conf.Paths,
p.externalCmdPool, externalCmdPool: p.externalCmdPool,
p, parent: p,
) }
p.pathManager.initialize()
if p.metrics != nil { if p.metrics != nil {
p.metrics.SetPathManager(p.pathManager) p.metrics.SetPathManager(p.pathManager)
@ -619,6 +636,15 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
!reflect.DeepEqual(gatherCleanerEntries(newConf.Paths), gatherCleanerEntries(p.conf.Paths)) || !reflect.DeepEqual(gatherCleanerEntries(newConf.Paths), gatherCleanerEntries(p.conf.Paths)) ||
closeLogger closeLogger
closePlaybackServer := newConf == nil ||
newConf.Playback != p.conf.Playback ||
newConf.PlaybackAddress != p.conf.PlaybackAddress ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
closeLogger
if !closePlaybackServer && p.playbackServer != nil && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.playbackServer.ReloadPathConfs(newConf.Paths)
}
closePathManager := newConf == nil || closePathManager := newConf == nil ||
newConf.LogLevel != p.conf.LogLevel || newConf.LogLevel != p.conf.LogLevel ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
@ -631,7 +657,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeMetrics || closeMetrics ||
closeLogger closeLogger
if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathManager.ReloadConf(newConf.Paths) p.pathManager.ReloadPathConfs(newConf.Paths)
} }
closeRTSPServer := newConf == nil || closeRTSPServer := newConf == nil ||
@ -865,6 +891,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.pathManager = nil p.pathManager = nil
} }
if closePlaybackServer && p.playbackServer != nil {
p.playbackServer.Close()
p.playbackServer = nil
}
if closeRecorderCleaner && p.recordCleaner != nil { if closeRecorderCleaner && p.recordCleaner != nil {
p.recordCleaner.Close() p.recordCleaner.Close()
p.recordCleaner = nil p.recordCleaner = nil

77
internal/core/path.go

@ -64,6 +64,7 @@ type pathAPIPathsGetReq struct {
} }
type path struct { type path struct {
parentCtx context.Context
logLevel conf.LogLevel logLevel conf.LogLevel
rtspAddress string rtspAddress string
readTimeout conf.StringDuration readTimeout conf.StringDuration
@ -115,65 +116,33 @@ type path struct {
done chan struct{} done chan struct{}
} }
func newPath( func (pa *path) initialize() {
parentCtx context.Context, ctx, ctxCancel := context.WithCancel(pa.parentCtx)
logLevel conf.LogLevel,
rtspAddress string, pa.ctx = ctx
readTimeout conf.StringDuration, pa.ctxCancel = ctxCancel
writeTimeout conf.StringDuration, pa.readers = make(map[defs.Reader]struct{})
writeQueueSize int, pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
udpMaxPayloadSize int, pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
confName string, pa.onDemandPublisherReadyTimer = newEmptyTimer()
cnf *conf.Path, pa.onDemandPublisherCloseTimer = newEmptyTimer()
name string, pa.chReloadConf = make(chan *conf.Path)
matches []string, pa.chStaticSourceSetReady = make(chan defs.PathSourceStaticSetReadyReq)
wg *sync.WaitGroup, pa.chStaticSourceSetNotReady = make(chan defs.PathSourceStaticSetNotReadyReq)
externalCmdPool *externalcmd.Pool, pa.chDescribe = make(chan defs.PathDescribeReq)
parent pathParent, pa.chAddPublisher = make(chan defs.PathAddPublisherReq)
) *path { pa.chRemovePublisher = make(chan defs.PathRemovePublisherReq)
ctx, ctxCancel := context.WithCancel(parentCtx) pa.chStartPublisher = make(chan defs.PathStartPublisherReq)
pa.chStopPublisher = make(chan defs.PathStopPublisherReq)
pa := &path{ pa.chAddReader = make(chan defs.PathAddReaderReq)
logLevel: logLevel, pa.chRemoveReader = make(chan defs.PathRemoveReaderReq)
rtspAddress: rtspAddress, pa.chAPIPathsGet = make(chan pathAPIPathsGetReq)
readTimeout: readTimeout, pa.done = make(chan struct{})
writeTimeout: writeTimeout,
writeQueueSize: writeQueueSize,
udpMaxPayloadSize: udpMaxPayloadSize,
confName: confName,
conf: cnf,
name: name,
matches: matches,
wg: wg,
externalCmdPool: externalCmdPool,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
readers: make(map[defs.Reader]struct{}),
onDemandStaticSourceReadyTimer: newEmptyTimer(),
onDemandStaticSourceCloseTimer: newEmptyTimer(),
onDemandPublisherReadyTimer: newEmptyTimer(),
onDemandPublisherCloseTimer: newEmptyTimer(),
chReloadConf: make(chan *conf.Path),
chStaticSourceSetReady: make(chan defs.PathSourceStaticSetReadyReq),
chStaticSourceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq),
chDescribe: make(chan defs.PathDescribeReq),
chAddPublisher: make(chan defs.PathAddPublisherReq),
chRemovePublisher: make(chan defs.PathRemovePublisherReq),
chStartPublisher: make(chan defs.PathStartPublisherReq),
chStopPublisher: make(chan defs.PathStopPublisherReq),
chAddReader: make(chan defs.PathAddReaderReq),
chRemoveReader: make(chan defs.PathRemoveReaderReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq),
done: make(chan struct{}),
}
pa.Log(logger.Debug, "created") pa.Log(logger.Debug, "created")
pa.wg.Add(1) pa.wg.Add(1)
go pa.run() go pa.run()
return pa
} }
func (pa *path) close() { func (pa *path) close() {

183
internal/core/path_manager.go

@ -35,40 +35,6 @@ func pathConfCanBeUpdated(oldPathConf *conf.Path, newPathConf *conf.Path) bool {
return newPathConf.Equal(clone) return newPathConf.Equal(clone)
} }
func getConfForPath(pathConfs map[string]*conf.Path, name string) (string, *conf.Path, []string, error) {
err := conf.IsValidPathName(name)
if err != nil {
return "", nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name)
}
// normal path
if pathConf, ok := pathConfs[name]; ok {
return name, pathConf, nil, nil
}
// regular expression-based path
for pathConfName, pathConf := range pathConfs {
if pathConf.Regexp != nil && pathConfName != "all" && pathConfName != "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
}
}
}
// process path configuration "all_others" after everything else
for pathConfName, pathConf := range pathConfs {
if pathConfName == "all" || pathConfName == "all_others" {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
}
}
}
return "", nil, nil, fmt.Errorf("path '%s' is not configured", name)
}
type pathManagerHLSServer interface { type pathManagerHLSServer interface {
PathReady(defs.Path) PathReady(defs.Path)
PathNotReady(defs.Path) PathNotReady(defs.Path)
@ -99,62 +65,37 @@ type pathManager struct {
pathsByConf map[string]map[*path]struct{} pathsByConf map[string]map[*path]struct{}
// in // in
chReloadConf chan map[string]*conf.Path chReloadConf chan map[string]*conf.Path
chSetHLSServer chan pathManagerHLSServer chSetHLSServer chan pathManagerHLSServer
chClosePath chan *path chClosePath chan *path
chPathReady chan *path chPathReady chan *path
chPathNotReady chan *path chPathNotReady chan *path
chGetConfForPath chan defs.PathGetConfForPathReq chFindPathConf chan defs.PathFindPathConfReq
chDescribe chan defs.PathDescribeReq chDescribe chan defs.PathDescribeReq
chAddReader chan defs.PathAddReaderReq chAddReader chan defs.PathAddReaderReq
chAddPublisher chan defs.PathAddPublisherReq chAddPublisher chan defs.PathAddPublisherReq
chAPIPathsList chan pathAPIPathsListReq chAPIPathsList chan pathAPIPathsListReq
chAPIPathsGet chan pathAPIPathsGetReq chAPIPathsGet chan pathAPIPathsGetReq
} }
func newPathManager( func (pm *pathManager) initialize() {
logLevel conf.LogLevel,
externalAuthenticationURL string,
rtspAddress string,
authMethods conf.AuthMethods,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
writeQueueSize int,
udpMaxPayloadSize int,
pathConfs map[string]*conf.Path,
externalCmdPool *externalcmd.Pool,
parent pathManagerParent,
) *pathManager {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
pm := &pathManager{ pm.ctx = ctx
logLevel: logLevel, pm.ctxCancel = ctxCancel
externalAuthenticationURL: externalAuthenticationURL, pm.paths = make(map[string]*path)
rtspAddress: rtspAddress, pm.pathsByConf = make(map[string]map[*path]struct{})
authMethods: authMethods, pm.chReloadConf = make(chan map[string]*conf.Path)
readTimeout: readTimeout, pm.chSetHLSServer = make(chan pathManagerHLSServer)
writeTimeout: writeTimeout, pm.chClosePath = make(chan *path)
writeQueueSize: writeQueueSize, pm.chPathReady = make(chan *path)
udpMaxPayloadSize: udpMaxPayloadSize, pm.chPathNotReady = make(chan *path)
pathConfs: pathConfs, pm.chFindPathConf = make(chan defs.PathFindPathConfReq)
externalCmdPool: externalCmdPool, pm.chDescribe = make(chan defs.PathDescribeReq)
parent: parent, pm.chAddReader = make(chan defs.PathAddReaderReq)
ctx: ctx, pm.chAddPublisher = make(chan defs.PathAddPublisherReq)
ctxCancel: ctxCancel, pm.chAPIPathsList = make(chan pathAPIPathsListReq)
paths: make(map[string]*path), pm.chAPIPathsGet = make(chan pathAPIPathsGetReq)
pathsByConf: make(map[string]map[*path]struct{}),
chReloadConf: make(chan map[string]*conf.Path),
chSetHLSServer: make(chan pathManagerHLSServer),
chClosePath: make(chan *path),
chPathReady: make(chan *path),
chPathNotReady: make(chan *path),
chGetConfForPath: make(chan defs.PathGetConfForPathReq),
chDescribe: make(chan defs.PathDescribeReq),
chAddReader: make(chan defs.PathAddReaderReq),
chAddPublisher: make(chan defs.PathAddPublisherReq),
chAPIPathsList: make(chan pathAPIPathsListReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq),
}
for pathConfName, pathConf := range pm.pathConfs { for pathConfName, pathConf := range pm.pathConfs {
if pathConf.Regexp == nil { if pathConf.Regexp == nil {
@ -166,8 +107,6 @@ func newPathManager(
pm.wg.Add(1) pm.wg.Add(1)
go pm.run() go pm.run()
return pm
} }
func (pm *pathManager) close() { func (pm *pathManager) close() {
@ -202,8 +141,8 @@ outer:
case pa := <-pm.chPathNotReady: case pa := <-pm.chPathNotReady:
pm.doPathNotReady(pa) pm.doPathNotReady(pa)
case req := <-pm.chGetConfForPath: case req := <-pm.chFindPathConf:
pm.doGetConfForPath(req) pm.doFindPathConf(req)
case req := <-pm.chDescribe: case req := <-pm.chDescribe:
pm.doDescribe(req) pm.doDescribe(req)
@ -288,25 +227,25 @@ func (pm *pathManager) doPathNotReady(pa *path) {
} }
} }
func (pm *pathManager) doGetConfForPath(req defs.PathGetConfForPathReq) { func (pm *pathManager) doFindPathConf(req defs.PathFindPathConfReq) {
_, pathConf, _, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) _, pathConf, _, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name)
if err != nil { if err != nil {
req.Res <- defs.PathGetConfForPathRes{Err: err} req.Res <- defs.PathFindPathConfRes{Err: err}
return return
} }
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods,
pathConf, req.AccessRequest) pathConf, req.AccessRequest)
if err != nil { if err != nil {
req.Res <- defs.PathGetConfForPathRes{Err: err} req.Res <- defs.PathFindPathConfRes{Err: err}
return return
} }
req.Res <- defs.PathGetConfForPathRes{Conf: pathConf} req.Res <- defs.PathFindPathConfRes{Conf: pathConf}
} }
func (pm *pathManager) doDescribe(req defs.PathDescribeReq) { func (pm *pathManager) doDescribe(req defs.PathDescribeReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) pathConfName, pathConf, pathMatches, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name)
if err != nil { if err != nil {
req.Res <- defs.PathDescribeRes{Err: err} req.Res <- defs.PathDescribeRes{Err: err}
return return
@ -328,7 +267,7 @@ func (pm *pathManager) doDescribe(req defs.PathDescribeReq) {
} }
func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) { func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) pathConfName, pathConf, pathMatches, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name)
if err != nil { if err != nil {
req.Res <- defs.PathAddReaderRes{Err: err} req.Res <- defs.PathAddReaderRes{Err: err}
return return
@ -352,7 +291,7 @@ func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) {
} }
func (pm *pathManager) doAddPublisher(req defs.PathAddPublisherReq) { func (pm *pathManager) doAddPublisher(req defs.PathAddPublisherReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) pathConfName, pathConf, pathMatches, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name)
if err != nil { if err != nil {
req.Res <- defs.PathAddPublisherRes{Err: err} req.Res <- defs.PathAddPublisherRes{Err: err}
return return
@ -401,21 +340,23 @@ func (pm *pathManager) createPath(
name string, name string,
matches []string, matches []string,
) { ) {
pa := newPath( pa := &path{
pm.ctx, parentCtx: pm.ctx,
pm.logLevel, logLevel: pm.logLevel,
pm.rtspAddress, rtspAddress: pm.rtspAddress,
pm.readTimeout, readTimeout: pm.readTimeout,
pm.writeTimeout, writeTimeout: pm.writeTimeout,
pm.writeQueueSize, writeQueueSize: pm.writeQueueSize,
pm.udpMaxPayloadSize, udpMaxPayloadSize: pm.udpMaxPayloadSize,
pathConfName, confName: pathConfName,
pathConf, conf: pathConf,
name, name: name,
matches, matches: matches,
&pm.wg, wg: &pm.wg,
pm.externalCmdPool, externalCmdPool: pm.externalCmdPool,
pm) parent: pm,
}
pa.initialize()
pm.paths[name] = pa pm.paths[name] = pa
@ -433,8 +374,8 @@ func (pm *pathManager) removePath(pa *path) {
delete(pm.paths, pa.name) delete(pm.paths, pa.name)
} }
// ReloadConf is called by core. // ReloadPathConfs is called by core.
func (pm *pathManager) ReloadConf(pathConfs map[string]*conf.Path) { func (pm *pathManager) ReloadPathConfs(pathConfs map[string]*conf.Path) {
select { select {
case pm.chReloadConf <- pathConfs: case pm.chReloadConf <- pathConfs:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
@ -469,14 +410,14 @@ func (pm *pathManager) closePath(pa *path) {
} }
// GetConfForPath is called by a reader or publisher. // GetConfForPath is called by a reader or publisher.
func (pm *pathManager) GetConfForPath(req defs.PathGetConfForPathReq) defs.PathGetConfForPathRes { func (pm *pathManager) FindPathConf(req defs.PathFindPathConfReq) defs.PathFindPathConfRes {
req.Res = make(chan defs.PathGetConfForPathRes) req.Res = make(chan defs.PathFindPathConfRes)
select { select {
case pm.chGetConfForPath <- req: case pm.chFindPathConf <- req:
return <-req.Res return <-req.Res
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return defs.PathGetConfForPathRes{Err: fmt.Errorf("terminated")} return defs.PathFindPathConfRes{Err: fmt.Errorf("terminated")}
} }
} }

10
internal/defs/path.go

@ -52,16 +52,16 @@ type PathAccessRequest struct {
RTSPNonce string RTSPNonce string
} }
// PathGetConfForPathRes contains the response of GetConfForPath(). // PathFindPathConfRes contains the response of FindPathConf().
type PathGetConfForPathRes struct { type PathFindPathConfRes struct {
Conf *conf.Path Conf *conf.Path
Err error Err error
} }
// PathGetConfForPathReq contains arguments of GetConfForPath(). // PathFindPathConfReq contains arguments of FindPathConf().
type PathGetConfForPathReq struct { type PathFindPathConfReq struct {
AccessRequest PathAccessRequest AccessRequest PathAccessRequest
Res chan PathGetConfForPathRes Res chan PathFindPathConfRes
} }
// PathDescribeRes contains the response of Describe(). // PathDescribeRes contains the response of Describe().

2
internal/defs/path_manager.go

@ -2,7 +2,7 @@ package defs
// PathManager is a path manager. // PathManager is a path manager.
type PathManager interface { type PathManager interface {
GetConfForPath(req PathGetConfForPathReq) PathGetConfForPathRes FindPathConf(req PathFindPathConfReq) PathFindPathConfRes
Describe(req PathDescribeReq) PathDescribeRes Describe(req PathDescribeReq) PathDescribeRes
AddPublisher(req PathAddPublisherReq) PathAddPublisherRes AddPublisher(req PathAddPublisherReq) PathAddPublisherRes
AddReader(req PathAddReaderReq) PathAddReaderRes AddReader(req PathAddReaderReq) PathAddReaderRes

410
internal/playback/fmp4.go

@ -0,0 +1,410 @@
package playback
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"time"
"github.com/abema/go-mp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
)
const (
sampleFlagIsNonSyncSample = 1 << 16
)
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
timeScale64 := uint64(timeScale)
secs := v / time.Second
dec := v % time.Second
return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second)
}
func durationMp4ToGo(v uint64, timeScale uint32) time.Duration {
timeScale64 := uint64(timeScale)
secs := v / timeScale64
dec := v % timeScale64
return time.Duration(secs)*time.Second + time.Duration(dec)*time.Second/time.Duration(timeScale64)
}
var errTerminated = errors.New("terminated")
func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
buf := make([]byte, 8)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}
if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return nil, fmt.Errorf("ftyp box not found")
}
ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return nil, err
}
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return nil, fmt.Errorf("moov box not found")
}
moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
buf = make([]byte, ftypSize+moovSize)
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
}
return buf, nil
}
func seekAndMuxParts(
r io.ReadSeeker,
init []byte,
minTime time.Duration,
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
minTimeMP4 := durationGoToMp4(minTime, 90000)
maxTimeMP4 := durationGoToMp4(maxTime, 90000)
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt
var outPart *fmp4.Part
var outTrack *fmp4.PartTrack
var outBuf seekablebuffer.Buffer
elapsed := uint64(0)
initWritten := false
firstSampleWritten := make(map[uint32]struct{})
gop := make(map[uint32][]*fmp4.PartSample)
_, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) {
switch h.BoxInfo.Type.String() {
case "moof":
moofOffset = h.BoxInfo.Offset
outPart = &fmp4.Part{}
return h.Expand()
case "traf":
return h.Expand()
case "tfhd":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
tfhd = box.(*mp4.Tfhd)
case "tfdt":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
tfdt = box.(*mp4.Tfdt)
if tfdt.BaseMediaDecodeTimeV1 >= maxTimeMP4 {
return nil, errTerminated
}
outTrack = &fmp4.PartTrack{ID: int(tfhd.TrackID)}
case "trun":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
trun := box.(*mp4.Trun)
dataOffset := moofOffset + uint64(trun.DataOffset)
_, err = r.Seek(int64(dataOffset), io.SeekStart)
if err != nil {
return nil, err
}
elapsed = tfdt.BaseMediaDecodeTimeV1
baseTimeSet := false
for _, e := range trun.Entries {
payload := make([]byte, e.SampleSize)
_, err := io.ReadFull(r, payload)
if err != nil {
return nil, err
}
if elapsed >= maxTimeMP4 {
break
}
isRandom := (e.SampleFlags & sampleFlagIsNonSyncSample) == 0
_, fsw := firstSampleWritten[tfhd.TrackID]
sa := &fmp4.PartSample{
Duration: e.SampleDuration,
PTSOffset: e.SampleCompositionTimeOffsetV1,
IsNonSyncSample: !isRandom,
Payload: payload,
}
if !fsw {
if isRandom {
gop[tfhd.TrackID] = []*fmp4.PartSample{sa}
} else {
gop[tfhd.TrackID] = append(gop[tfhd.TrackID], sa)
}
}
if elapsed >= minTimeMP4 {
if !baseTimeSet {
outTrack.BaseTime = elapsed - minTimeMP4
if !fsw {
if !isRandom {
for _, sa2 := range gop[tfhd.TrackID][:len(gop[tfhd.TrackID])-1] {
sa2.Duration = 0
sa2.PTSOffset = 0
outTrack.Samples = append(outTrack.Samples, sa2)
}
}
delete(gop, tfhd.TrackID)
firstSampleWritten[tfhd.TrackID] = struct{}{}
}
}
outTrack.Samples = append(outTrack.Samples, sa)
}
elapsed += uint64(e.SampleDuration)
}
if outTrack.Samples != nil {
outPart.Tracks = append(outPart.Tracks, outTrack)
}
outTrack = nil
case "mdat":
if outPart.Tracks != nil {
if !initWritten {
initWritten = true
_, err := w.Write(init)
if err != nil {
return nil, err
}
}
err := outPart.Marshal(&outBuf)
if err != nil {
return nil, err
}
_, err = w.Write(outBuf.Bytes())
if err != nil {
return nil, err
}
outBuf.Reset()
}
outPart = nil
}
return nil, nil
})
if err != nil && !errors.Is(err, errTerminated) {
return 0, err
}
if !initWritten {
return 0, errNoSegmentsFound
}
elapsed -= minTimeMP4
return durationMp4ToGo(elapsed, 90000), nil
}
func muxParts(
r io.ReadSeeker,
startTime time.Duration,
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
maxTimeMP4 := durationGoToMp4(maxTime, 90000)
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt
var outPart *fmp4.Part
var outTrack *fmp4.PartTrack
var outBuf seekablebuffer.Buffer
elapsed := uint64(0)
_, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) {
switch h.BoxInfo.Type.String() {
case "moof":
moofOffset = h.BoxInfo.Offset
outPart = &fmp4.Part{}
return h.Expand()
case "traf":
return h.Expand()
case "tfhd":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
tfhd = box.(*mp4.Tfhd)
case "tfdt":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
tfdt = box.(*mp4.Tfdt)
if tfdt.BaseMediaDecodeTimeV1 >= maxTimeMP4 {
return nil, errTerminated
}
outTrack = &fmp4.PartTrack{
ID: int(tfhd.TrackID),
BaseTime: tfdt.BaseMediaDecodeTimeV1 + durationGoToMp4(startTime, 90000),
}
case "trun":
box, _, err := h.ReadPayload()
if err != nil {
return nil, err
}
trun := box.(*mp4.Trun)
dataOffset := moofOffset + uint64(trun.DataOffset)
_, err = r.Seek(int64(dataOffset), io.SeekStart)
if err != nil {
return nil, err
}
elapsed = tfdt.BaseMediaDecodeTimeV1
for _, e := range trun.Entries {
payload := make([]byte, e.SampleSize)
_, err := io.ReadFull(r, payload)
if err != nil {
return nil, err
}
if elapsed >= maxTimeMP4 {
break
}
isRandom := (e.SampleFlags & sampleFlagIsNonSyncSample) == 0
sa := &fmp4.PartSample{
Duration: e.SampleDuration,
PTSOffset: e.SampleCompositionTimeOffsetV1,
IsNonSyncSample: !isRandom,
Payload: payload,
}
outTrack.Samples = append(outTrack.Samples, sa)
elapsed += uint64(e.SampleDuration)
}
if outTrack.Samples != nil {
outPart.Tracks = append(outPart.Tracks, outTrack)
}
outTrack = nil
case "mdat":
if outPart.Tracks != nil {
err := outPart.Marshal(&outBuf)
if err != nil {
return nil, err
}
_, err = w.Write(outBuf.Bytes())
if err != nil {
return nil, err
}
outBuf.Reset()
}
outPart = nil
}
return nil, nil
})
if err != nil && !errors.Is(err, errTerminated) {
return 0, err
}
return durationMp4ToGo(elapsed, 90000), nil
}
func fmp4SeekAndMux(
fpath string,
minTime time.Duration,
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
f, err := os.Open(fpath)
if err != nil {
return 0, err
}
defer f.Close()
init, err := fmp4ReadInit(f)
if err != nil {
return 0, err
}
elapsed, err := seekAndMuxParts(f, init, minTime, maxTime, w)
if err != nil {
return 0, err
}
return elapsed, nil
}
func fmp4Mux(
fpath string,
startTime time.Duration,
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
f, err := os.Open(fpath)
if err != nil {
return 0, err
}
defer f.Close()
elapsed, err := muxParts(f, startTime, maxTime, w)
if err != nil {
return 0, err
}
return elapsed, nil
}

79
internal/playback/fmp4_test.go

@ -0,0 +1,79 @@
package playback
import (
"io"
"os"
"testing"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
)
func writeBenchInit(f io.WriteSeeker) {
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9,
0x20,
},
PPS: []byte{0x08},
},
},
{
ID: 2,
TimeScale: 90000,
Codec: &fmp4.CodecMPEG4Audio{
Config: mpeg4audio.Config{
Type: mpeg4audio.ObjectTypeAACLC,
SampleRate: 48000,
ChannelCount: 2,
},
},
},
},
}
err := init.Marshal(f)
if err != nil {
panic(err)
}
_, err = f.Write([]byte{
'm', 'o', 'o', 'f', 0x00, 0x00, 0x00, 0x10,
})
if err != nil {
panic(err)
}
}
func BenchmarkFMP4ReadInit(b *testing.B) {
f, err := os.CreateTemp(os.TempDir(), "mediamtx-playback-fmp4-")
if err != nil {
panic(err)
}
defer os.Remove(f.Name())
writeBenchInit(f)
f.Close()
for n := 0; n < b.N; n++ {
func() {
f, err := os.Open(f.Name())
if err != nil {
panic(err)
}
defer f.Close()
_, err = fmp4ReadInit(f)
if err != nil {
panic(err)
}
}()
}
}

299
internal/playback/server.go

@ -0,0 +1,299 @@
// Package playback contains the playback server.
package playback
import (
"errors"
"fmt"
"io/fs"
"net"
"net/http"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpserv"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
"github.com/gin-gonic/gin"
)
const (
concatenationTolerance = 1 * time.Second
)
var errNoSegmentsFound = errors.New("no recording segments found for the given timestamp")
type writerWrapper struct {
ctx *gin.Context
written bool
}
func (w *writerWrapper) Write(p []byte) (int, error) {
if !w.written {
w.written = true
w.ctx.Header("Accept-Ranges", "none")
w.ctx.Header("Content-Type", "video/mp4")
}
return w.ctx.Writer.Write(p)
}
type segment struct {
fpath string
start time.Time
}
func findSegments(
pathConf *conf.Path,
pathName string,
start time.Time,
duration time.Duration,
) ([]segment, error) {
if !pathConf.Playback {
return nil, fmt.Errorf("playback is disabled on path '%s'", pathName)
}
recordPath := record.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)
// we have to convert to absolute paths
// otherwise, recordPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)
commonPath := record.CommonPath(recordPath)
end := start.Add(duration)
var segments []segment
// gather all segments that starts before the end of the playback
err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
var pa record.Path
ok := pa.Decode(recordPath, fpath)
if ok && !end.Before(time.Time(pa)) {
segments = append(segments, segment{
fpath: fpath,
start: time.Time(pa),
})
}
}
return nil
})
if err != nil {
return nil, err
}
if segments == nil {
return nil, errNoSegmentsFound
}
sort.Slice(segments, func(i, j int) bool {
return segments[i].start.Before(segments[j].start)
})
// find the segment that may contain the start of the playback and remove all previous ones
found := false
for i := 0; i < len(segments)-1; i++ {
if !start.Before(segments[i].start) && start.Before(segments[i+1].start) {
segments = segments[i:]
found = true
break
}
}
// otherwise, keep the last segment only and check whether it may contain the start of the playback
if !found {
segments = segments[len(segments)-1:]
if segments[len(segments)-1].start.After(start) {
return nil, errNoSegmentsFound
}
}
return segments, nil
}
// Server is the playback server.
type Server struct {
Address string
ReadTimeout conf.StringDuration
PathConfs map[string]*conf.Path
Parent logger.Writer
httpServer *httpserv.WrappedServer
mutex sync.RWMutex
}
// Initialize initializes API.
func (p *Server) Initialize() error {
router := gin.New()
router.SetTrustedProxies(nil) //nolint:errcheck
group := router.Group("/")
group.GET("/get", p.onGet)
network, address := restrictnetwork.Restrict("tcp", p.Address)
var err error
p.httpServer, err = httpserv.NewWrappedServer(
network,
address,
time.Duration(p.ReadTimeout),
"",
"",
router,
p,
)
if err != nil {
return err
}
p.Log(logger.Info, "listener opened on "+address)
return nil
}
// Close closes Server.
func (p *Server) Close() {
p.Log(logger.Info, "listener is closing")
p.httpServer.Close()
}
// Log implements logger.Writer.
func (p *Server) Log(level logger.Level, format string, args ...interface{}) {
p.Parent.Log(level, "[playback] "+format, args...)
}
// ReloadPathConfs is called by core.Core.
func (p *Server) ReloadPathConfs(pathConfs map[string]*conf.Path) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.PathConfs = pathConfs
}
func (p *Server) writeError(ctx *gin.Context, status int, err error) {
// show error in logs
p.Log(logger.Error, err.Error())
// add error to response
ctx.String(status, err.Error())
}
func (p *Server) safeFindPathConf(name string) (*conf.Path, error) {
p.mutex.RLock()
defer p.mutex.RUnlock()
_, pathConf, _, err := conf.FindPathConf(p.PathConfs, name)
return pathConf, err
}
func (p *Server) onGet(ctx *gin.Context) {
pathName := ctx.Query("path")
start, err := time.Parse(time.RFC3339, ctx.Query("start"))
if err != nil {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err))
return
}
duration, err := time.ParseDuration(ctx.Query("duration"))
if err != nil {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid duration: %w", err))
return
}
format := ctx.Query("format")
if format != "fmp4" {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid format: %s", format))
return
}
pathConf, err := p.safeFindPathConf(pathName)
if err != nil {
p.writeError(ctx, http.StatusBadRequest, err)
return
}
segments, err := findSegments(pathConf, pathName, start, duration)
if err != nil {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
if pathConf.RecordFormat != conf.RecordFormatFMP4 {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4"))
return
}
ww := &writerWrapper{ctx: ctx}
minTime := start.Sub(segments[0].start)
maxTime := minTime + duration
elapsed, err := fmp4SeekAndMux(
segments[0].fpath,
minTime,
maxTime,
ww)
if err != nil {
// user aborted the download
var neterr *net.OpError
if errors.As(err, &neterr) {
return
}
// nothing has been written yet; send back JSON
if !ww.written {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
// something has been already written: abort and write to logs only
p.Log(logger.Error, err.Error())
return
}
start = start.Add(elapsed)
duration -= elapsed
overallElapsed := elapsed
for _, seg := range segments[1:] {
// there's a gap between segments; stop serving the recording.
if seg.start.Before(start.Add(-concatenationTolerance)) || seg.start.After(start.Add(concatenationTolerance)) {
return
}
elapsed, err := fmp4Mux(seg.fpath, overallElapsed, duration, ctx.Writer)
if err != nil {
// user aborted the download
var neterr *net.OpError
if errors.As(err, &neterr) {
return
}
// something has been already written: abort and write to logs only
p.Log(logger.Error, err.Error())
return
}
start = seg.start.Add(elapsed)
duration -= elapsed
overallElapsed += elapsed
}
}

228
internal/playback/server_test.go

@ -0,0 +1,228 @@
package playback
import (
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"testing"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/stretchr/testify/require"
)
type nilLogger struct{}
func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
func writeSegment1(t *testing.T, fpath string) {
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9,
0x20,
},
PPS: []byte{0x08},
},
}},
}
var buf1 seekablebuffer.Buffer
err := init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{},
}},
},
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 30 * 90000,
Samples: []*fmp4.PartSample{
{
Duration: 30 * 90000,
IsNonSyncSample: false,
Payload: []byte{1, 2},
},
{
Duration: 1 * 90000,
IsNonSyncSample: false,
Payload: []byte{3, 4},
},
{
Duration: 1 * 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
}},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}
func writeSegment2(t *testing.T, fpath string) {
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9,
0x20,
},
PPS: []byte{0x08},
},
}},
}
var buf1 seekablebuffer.Buffer
err := init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 1 * 90000,
IsNonSyncSample: false,
Payload: []byte{7, 8},
},
{
Duration: 1 * 90000,
IsNonSyncSample: false,
Payload: []byte{9, 10},
},
},
}},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}
func TestServer(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-000000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-000000.mp4"))
s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.StringDuration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Playback: true,
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
Parent: &nilLogger{},
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 0, time.Local).Format(time.RFC3339))
v.Set("duration", "2s")
v.Set("format", "fmp4")
u := &url.URL{
Scheme: "http",
Host: "localhost:9996",
Path: "/get",
RawQuery: v.Encode(),
}
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
buf, err := io.ReadAll(res.Body)
require.NoError(t, err)
var parts fmp4.Parts
err = parts.Unmarshal(buf)
require.NoError(t, err)
require.Equal(t, fmp4.Parts{
{
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
Samples: []*fmp4.PartSample{
{
Duration: 0,
Payload: []byte{3, 4},
},
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
},
},
{
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 90000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
Payload: []byte{7, 8},
},
},
},
},
},
}, parts)
}

13
internal/record/agent_instance.go

@ -34,15 +34,10 @@ type agentInstance struct {
func (a *agentInstance) initialize() { func (a *agentInstance) initialize() {
a.pathFormat = a.agent.PathFormat a.pathFormat = a.agent.PathFormat
a.pathFormat = strings.ReplaceAll(a.pathFormat, "%path", a.agent.PathName) a.pathFormat = PathAddExtension(
strings.ReplaceAll(a.pathFormat, "%path", a.agent.PathName),
switch a.agent.Format { a.agent.Format,
case conf.RecordFormatMPEGTS: )
a.pathFormat += ".ts"
default:
a.pathFormat += ".mp4"
}
a.terminate = make(chan struct{}) a.terminate = make(chan struct{})
a.done = make(chan struct{}) a.done = make(chan struct{})

50
internal/record/cleaner.go

@ -5,7 +5,6 @@ import (
"io/fs" "io/fs"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"time" "time"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
@ -14,36 +13,9 @@ import (
var timeNow = time.Now var timeNow = time.Now
func commonPath(v string) string {
common := ""
remaining := v
for {
i := strings.IndexAny(remaining, "\\/")
if i < 0 {
break
}
var part string
part, remaining = remaining[:i+1], remaining[i+1:]
if strings.Contains(part, "%") {
break
}
common += part
}
if len(common) > 0 {
common = common[:len(common)-1]
}
return common
}
// CleanerEntry is a cleaner entry. // CleanerEntry is a cleaner entry.
type CleanerEntry struct { type CleanerEntry struct {
PathFormat string Path string
Format conf.RecordFormat Format conf.RecordFormat
DeleteAfter time.Duration DeleteAfter time.Duration
} }
@ -108,21 +80,13 @@ func (c *Cleaner) doRun() {
} }
func (c *Cleaner) doRunEntry(e *CleanerEntry) error { func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
pathFormat := e.PathFormat entryPath := PathAddExtension(e.Path, e.Format)
switch e.Format {
case conf.RecordFormatMPEGTS:
pathFormat += ".ts"
default:
pathFormat += ".mp4"
}
// we have to convert to absolute paths // we have to convert to absolute paths
// otherwise, commonPath and fpath inside Walk() won't have common elements // otherwise, entryPath and fpath inside Walk() won't have common elements
pathFormat, _ = filepath.Abs(pathFormat) entryPath, _ = filepath.Abs(entryPath)
commonPath := commonPath(pathFormat) commonPath := CommonPath(entryPath)
now := timeNow() now := timeNow()
filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { //nolint:errcheck filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { //nolint:errcheck
@ -131,8 +95,8 @@ func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
} }
if !info.IsDir() { if !info.IsDir() {
var pa path var pa Path
ok := pa.decode(pathFormat, fpath) ok := pa.Decode(entryPath, fpath)
if ok { if ok {
if now.Sub(time.Time(pa)) > e.DeleteAfter { if now.Sub(time.Time(pa)) > e.DeleteAfter {
c.Log(logger.Debug, "removing %s", fpath) c.Log(logger.Debug, "removing %s", fpath)

2
internal/record/cleaner_test.go

@ -32,7 +32,7 @@ func TestCleaner(t *testing.T) {
c := &Cleaner{ c := &Cleaner{
Entries: []CleanerEntry{{ Entries: []CleanerEntry{{
PathFormat: filepath.Join(dir, specialChars+"_%path/%Y-%m-%d_%H-%M-%S-%f"), Path: filepath.Join(dir, specialChars+"_%path/%Y-%m-%d_%H-%M-%S-%f"),
Format: conf.RecordFormatFMP4, Format: conf.RecordFormatFMP4,
DeleteAfter: 10 * time.Second, DeleteAfter: 10 * time.Second,
}}, }},

10
internal/record/format_fmp4_part.go

@ -6,8 +6,8 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"github.com/aler9/writerseeker"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -29,13 +29,13 @@ func writePart(
Tracks: fmp4PartTracks, Tracks: fmp4PartTracks,
} }
var ws writerseeker.WriterSeeker var buf seekablebuffer.Buffer
err := part.Marshal(&ws) err := part.Marshal(&buf)
if err != nil { if err != nil {
return err return err
} }
_, err = f.Write(ws.Bytes()) _, err = f.Write(buf.Bytes())
return err return err
} }
@ -54,7 +54,7 @@ func (p *formatFMP4Part) initialize() {
func (p *formatFMP4Part) close() error { func (p *formatFMP4Part) close() error {
if p.s.fi == nil { if p.s.fi == nil {
p.s.path = path(p.s.startNTP).encode(p.s.f.a.pathFormat) p.s.path = Path(p.s.startNTP).Encode(p.s.f.a.pathFormat)
p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path) p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path)
err := os.MkdirAll(filepath.Dir(p.s.path), 0o755) err := os.MkdirAll(filepath.Dir(p.s.path), 0o755)

8
internal/record/format_fmp4_segment.go

@ -5,8 +5,8 @@ import (
"os" "os"
"time" "time"
"github.com/aler9/writerseeker"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -21,13 +21,13 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error {
Tracks: fmp4Tracks, Tracks: fmp4Tracks,
} }
var ws writerseeker.WriterSeeker var buf seekablebuffer.Buffer
err := init.Marshal(&ws) err := init.Marshal(&buf)
if err != nil { if err != nil {
return err return err
} }
_, err = f.Write(ws.Bytes()) _, err = f.Write(buf.Bytes())
return err return err
} }

2
internal/record/format_mpegts_segment.go

@ -43,7 +43,7 @@ func (s *formatMPEGTSSegment) close() error {
func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil { if s.fi == nil {
s.path = path(s.startNTP).encode(s.f.a.pathFormat) s.path = Path(s.startNTP).Encode(s.f.a.pathFormat)
s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path) s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path)
err := os.MkdirAll(filepath.Dir(s.path), 0o755) err := os.MkdirAll(filepath.Dir(s.path), 0o755)

54
internal/record/path.go

@ -5,6 +5,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/bluenviron/mediamtx/internal/conf"
) )
func leadingZeros(v int, size int) string { func leadingZeros(v int, size int) string {
@ -21,9 +23,50 @@ func leadingZeros(v int, size int) string {
return out2 + out return out2 + out
} }
type path time.Time // PathAddExtension adds the file extension to path.
func PathAddExtension(path string, format conf.RecordFormat) string {
switch format {
case conf.RecordFormatMPEGTS:
return path + ".ts"
default:
return path + ".mp4"
}
}
// CommonPath returns the common path between all segments with given recording path.
func CommonPath(v string) string {
common := ""
remaining := v
for {
i := strings.IndexAny(remaining, "\\/")
if i < 0 {
break
}
var part string
part, remaining = remaining[:i+1], remaining[i+1:]
if strings.Contains(part, "%") {
break
}
common += part
}
if len(common) > 0 {
common = common[:len(common)-1]
}
return common
}
// Path is a record path.
type Path time.Time
func (p *path) decode(format string, v string) bool { // Decode decodes a Path.
func (p *Path) Decode(format string, v string) bool {
re := format re := format
for _, ch := range []uint8{ for _, ch := range []uint8{
@ -141,15 +184,16 @@ func (p *path) decode(format string, v string) bool {
} }
if unixSec > 0 { if unixSec > 0 {
*p = path(time.Unix(unixSec, 0)) *p = Path(time.Unix(unixSec, 0))
} else { } else {
*p = path(time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)) *p = Path(time.Date(year, month, day, hour, minute, second, micros*1000, time.Local))
} }
return true return true
} }
func (p path) encode(format string) string { // Encode encodes a path.
func (p Path) Encode(format string) string {
format = strings.ReplaceAll(format, "%Y", strconv.FormatInt(int64(time.Time(p).Year()), 10)) format = strings.ReplaceAll(format, "%Y", strconv.FormatInt(int64(time.Time(p).Year()), 10))
format = strings.ReplaceAll(format, "%m", leadingZeros(int(time.Time(p).Month()), 2)) format = strings.ReplaceAll(format, "%m", leadingZeros(int(time.Time(p).Month()), 2))
format = strings.ReplaceAll(format, "%d", leadingZeros(time.Time(p).Day(), 2)) format = strings.ReplaceAll(format, "%d", leadingZeros(time.Time(p).Day(), 2))

12
internal/record/path_test.go

@ -10,19 +10,19 @@ import (
var pathCases = []struct { var pathCases = []struct {
name string name string
format string format string
dec path dec Path
enc string enc string
}{ }{
{ {
"standard", "standard",
"%path/%Y-%m-%d_%H-%M-%S-%f.mp4", "%path/%Y-%m-%d_%H-%M-%S-%f.mp4",
path(time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local)), Path(time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local)),
"%path/2008-11-07_11-22-04-123456.mp4", "%path/2008-11-07_11-22-04-123456.mp4",
}, },
{ {
"unix seconds", "unix seconds",
"%path/%s.mp4", "%path/%s.mp4",
path(time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local()), Path(time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local()),
"%path/1638447323.mp4", "%path/1638447323.mp4",
}, },
} }
@ -30,8 +30,8 @@ var pathCases = []struct {
func TestPathDecode(t *testing.T) { func TestPathDecode(t *testing.T) {
for _, ca := range pathCases { for _, ca := range pathCases {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
var dec path var dec Path
ok := dec.decode(ca.format, ca.enc) ok := dec.Decode(ca.format, ca.enc)
require.Equal(t, true, ok) require.Equal(t, true, ok)
require.Equal(t, ca.dec, dec) require.Equal(t, ca.dec, dec)
}) })
@ -41,7 +41,7 @@ func TestPathDecode(t *testing.T) {
func TestPathEncode(t *testing.T) { func TestPathEncode(t *testing.T) {
for _, ca := range pathCases { for _, ca := range pathCases {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
require.Equal(t, ca.enc, ca.dec.encode(ca.format)) require.Equal(t, ca.enc, ca.dec.Encode(ca.format))
}) })
} }
} }

2
internal/servers/hls/http_server.go

@ -147,7 +147,7 @@ func (s *httpServer) onRequest(ctx *gin.Context) {
user, pass, hasCredentials := ctx.Request.BasicAuth() user, pass, hasCredentials := ctx.Request.BasicAuth()
res := s.pathManager.GetConfForPath(defs.PathGetConfForPathReq{ res := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: defs.PathAccessRequest{ AccessRequest: defs.PathAccessRequest{
Name: dir, Name: dir,
Query: ctx.Request.URL.RawQuery, Query: ctx.Request.URL.RawQuery,

2
internal/servers/webrtc/http_server.go

@ -112,7 +112,7 @@ func (s *httpServer) checkAuthOutsideSession(ctx *gin.Context, path string, publ
remoteAddr := net.JoinHostPort(ip, port) remoteAddr := net.JoinHostPort(ip, port)
user, pass, hasCredentials := ctx.Request.BasicAuth() user, pass, hasCredentials := ctx.Request.BasicAuth()
res := s.pathManager.GetConfForPath(defs.PathGetConfForPathReq{ res := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
AccessRequest: defs.PathAccessRequest{ AccessRequest: defs.PathAccessRequest{
Name: path, Name: path,
Query: ctx.Request.URL.RawQuery, Query: ctx.Request.URL.RawQuery,

35
mediamtx.yml

@ -41,11 +41,6 @@ udpMaxPayloadSize: 1472
# it is discarded. # it is discarded.
externalAuthenticationURL: externalAuthenticationURL:
# Enable the HTTP API.
api: no
# Address of the API listener.
apiAddress: 127.0.0.1:9997
# Enable Prometheus-compatible metrics. # Enable Prometheus-compatible metrics.
metrics: no metrics: no
# Address of the metrics listener. # Address of the metrics listener.
@ -69,10 +64,26 @@ runOnConnectRestart: no
# Environment variables are the same of runOnConnect. # Environment variables are the same of runOnConnect.
runOnDisconnect: runOnDisconnect:
###############################################
# Global settings -> API
# Enable controlling the server through the API.
api: no
# Address of the API listener.
apiAddress: 127.0.0.1:9997
###############################################
# Global settings -> Playback server
# Enable downloading recordings from the playback server.
playback: no
# Address of the playback server listener.
playbackAddress: :9996
############################################### ###############################################
# Global settings -> RTSP server # Global settings -> RTSP server
# Allow publishing and reading streams with the RTSP protocol. # Enable publishing and reading streams with the RTSP protocol.
rtsp: yes rtsp: yes
# List of enabled RTSP transport protocols. # List of enabled RTSP transport protocols.
# UDP is the most performant, but doesn't work when there's a NAT/firewall between # UDP is the most performant, but doesn't work when there's a NAT/firewall between
@ -112,7 +123,7 @@ authMethods: [basic]
############################################### ###############################################
# Global settings -> RTMP server # Global settings -> RTMP server
# Allow publishing and reading streams with the RTMP protocol. # Enable publishing and reading streams with the RTMP protocol.
rtmp: yes rtmp: yes
# Address of the RTMP listener. This is needed only when encryption is "no" or "optional". # Address of the RTMP listener. This is needed only when encryption is "no" or "optional".
rtmpAddress: :1935 rtmpAddress: :1935
@ -132,7 +143,7 @@ rtmpServerCert: server.crt
############################################### ###############################################
# Global settings -> HLS server # Global settings -> HLS server
# Allow reading streams with the HLS protocol. # Enable reading streams with the HLS protocol.
hls: yes hls: yes
# Address of the HLS listener. # Address of the HLS listener.
hlsAddress: :8888 hlsAddress: :8888
@ -188,7 +199,7 @@ hlsDirectory: ''
############################################### ###############################################
# Global settings -> WebRTC server # Global settings -> WebRTC server
# Allow publishing and reading streams with the WebRTC protocol. # Enable publishing and reading streams with the WebRTC protocol.
webrtc: yes webrtc: yes
# Address of the WebRTC HTTP listener. # Address of the WebRTC HTTP listener.
webrtcAddress: :8889 webrtcAddress: :8889
@ -236,7 +247,7 @@ webrtcICEServers2: []
############################################### ###############################################
# Global settings -> SRT server # Global settings -> SRT server
# Allow publishing and reading streams with the SRT protocol. # Enable publishing and reading streams with the SRT protocol.
srt: yes srt: yes
# Address of the SRT listener. # Address of the SRT listener.
srtAddress: :8890 srtAddress: :8890
@ -292,10 +303,12 @@ pathDefaults:
fallback: fallback:
############################################### ###############################################
# Default path settings -> Recording # Default path settings -> Record and playback
# Record streams to disk. # Record streams to disk.
record: no record: no
# Enable serving recordings with the playback server.
playback: yes
# Path of recording segments. # Path of recording segments.
# Extension is added automatically. # Extension is added automatically.
# Available variables are %path (path name), %Y %m %d %H %M %S %f %s (time in strftime format) # Available variables are %path (path name), %Y %m %d %H %M %S %f %s (time in strftime format)

Loading…
Cancel
Save