diff --git a/README.md b/README.md index 611b6276..aba22ac3 100644 --- a/README.md +++ b/README.md @@ -20,11 +20,15 @@ Features: * Publish live streams with RTSP (UDP, TCP or TLS mode) or RTMP * Read live streams with RTSP (UDP, UDP-multicast, TCP or TLS mode), RTMP or HLS * Pull and serve streams from other RTSP or RTMP servers or cameras, always or on-demand (RTSP proxy) -* Streams are automatically converted from a protocol to another (for instance, it's possible to publish with RTSP and read with HLS) -* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG) +* Each stream can have multiple video and audio tracks, encoded with any codec, including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG +* Streams are automatically converted from a protocol to another. For instance, it's possible to publish with RTSP and read with HLS + +Plus: + * Serve multiple streams at once in separate paths * Authenticate readers and publishers * Redirect readers to other RTSP servers (load balancing) +* Query and control the server through an HTTP API * Run custom commands when clients connect, disconnect, read or publish streams * Reload the configuration without disconnecting existing clients (hot reloading) * Compatible with Linux, Windows and macOS, does not require any dependency or interpreter, it's a single executable @@ -53,6 +57,7 @@ Features: * [Start on boot with systemd](#start-on-boot-with-systemd) * [Monitoring](#monitoring) * [Corrupted frames](#corrupted-frames) + * [HTTP API](#http-api) * [Command-line usage](#command-line-usage) * [Compile and run from source](#compile-and-run-from-source) * [Links](#links) @@ -538,6 +543,10 @@ In some scenarios, the server can send incomplete or corrupted frames. This can readBufferSize: 8192 ``` +### HTTP API + +TODO + ### Command-line usage ``` diff --git a/go.mod b/go.mod index 2826497e..11da028b 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ require ( github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/aler9/gortsplib v0.0.0-20210731192657-45db8582b0b3 github.com/asticode/go-astits v1.9.0 - github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 + github.com/gin-gonic/gin v1.7.2 github.com/gookit/color v1.4.2 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/notedit/rtmp v0.0.2 diff --git a/go.sum b/go.sum index 2a65f2fd..0026fe8d 100644 --- a/go.sum +++ b/go.sum @@ -15,14 +15,39 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.7.2 h1:Tg03T9yM2xa8j6I3Z3oqLaQRSmKvxPd6g/2HJ6zICFA= +github.com/gin-gonic/gin v1.7.2/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= +github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= +github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gookit/color v1.4.2 h1:tXy44JFSFkKnELV6WaMo/lLfu/meqITX3iAV52do7lk= github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ= github.com/icza/bitio v1.0.0 h1:squ/m1SHyFeCA6+6Gyol1AxV9nmPPlJFT8c2vKdj3U8= github.com/icza/bitio v1.0.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A= github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lToqnXgA8Mz1DP11X4zSJ159C3k= github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= @@ -36,20 +61,28 @@ github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKq github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 h1:QldyIu/L63oPpyvQmHgvgickp1Yw510KJOqX7H24mg8= github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= @@ -57,6 +90,7 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 27283836..063cbe4c 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -57,57 +57,110 @@ func decrypt(key string, byts []byte) ([]byte, error) { // Conf is the main program configuration. type Conf struct { // general - LogLevel string `yaml:"logLevel"` + LogLevel string `yaml:"logLevel" json:"logLevel"` LogLevelParsed logger.Level `yaml:"-" json:"-"` - LogDestinations []string `yaml:"logDestinations"` + LogDestinations []string `yaml:"logDestinations" json:"logDestinations"` LogDestinationsParsed map[logger.Destination]struct{} `yaml:"-" json:"-"` - LogFile string `yaml:"logFile"` - ReadTimeout time.Duration `yaml:"readTimeout"` - WriteTimeout time.Duration `yaml:"writeTimeout"` - ReadBufferCount int `yaml:"readBufferCount"` - Metrics bool `yaml:"metrics"` - MetricsAddress string `yaml:"metricsAddress"` - PPROF bool `yaml:"pprof"` - PPROFAddress string `yaml:"pprofAddress"` - RunOnConnect string `yaml:"runOnConnect"` - RunOnConnectRestart bool `yaml:"runOnConnectRestart"` + LogFile string `yaml:"logFile" json:"logFile"` + ReadTimeout time.Duration `yaml:"readTimeout" json:"readTimeout"` + WriteTimeout time.Duration `yaml:"writeTimeout" json:"writeTimeout"` + ReadBufferCount int `yaml:"readBufferCount" json:"readBufferCount"` + API bool `yaml:"api" json:"api"` + APIAddress string `yaml:"apiAddress" json:"apiAddress"` + Metrics bool `yaml:"metrics" json:"metrics"` + MetricsAddress string `yaml:"metricsAddress" json:"metricsAddress"` + PPROF bool `yaml:"pprof" json:"pprof"` + PPROFAddress string `yaml:"pprofAddress" json:"pprofAddress"` + RunOnConnect string `yaml:"runOnConnect" json:"runOnConnect"` + RunOnConnectRestart bool `yaml:"runOnConnectRestart" json:"runOnConnectRestart"` // rtsp - RTSPDisable bool `yaml:"rtspDisable"` - Protocols []string `yaml:"protocols"` + RTSPDisable bool `yaml:"rtspDisable" json:"rtspDisable"` + Protocols []string `yaml:"protocols" json:"protocols"` ProtocolsParsed map[Protocol]struct{} `yaml:"-" json:"-"` - Encryption string `yaml:"encryption"` + Encryption string `yaml:"encryption" json:"encryption"` EncryptionParsed Encryption `yaml:"-" json:"-"` - RTSPAddress string `yaml:"rtspAddress"` - RTSPSAddress string `yaml:"rtspsAddress"` - RTPAddress string `yaml:"rtpAddress"` - RTCPAddress string `yaml:"rtcpAddress"` - MulticastIPRange string `yaml:"multicastIPRange"` - MulticastRTPPort int `yaml:"multicastRTPPort"` - MulticastRTCPPort int `yaml:"multicastRTCPPort"` - ServerKey string `yaml:"serverKey"` - ServerCert string `yaml:"serverCert"` - AuthMethods []string `yaml:"authMethods"` + RTSPAddress string `yaml:"rtspAddress" json:"rtspAddress"` + RTSPSAddress string `yaml:"rtspsAddress" json:"rtspsAddress"` + RTPAddress string `yaml:"rtpAddress" json:"rtpAddress"` + RTCPAddress string `yaml:"rtcpAddress" json:"rtcpAddress"` + MulticastIPRange string `yaml:"multicastIPRange" json:"multicastIPRange"` + MulticastRTPPort int `yaml:"multicastRTPPort" json:"multicastRTPPort"` + MulticastRTCPPort int `yaml:"multicastRTCPPort" json:"multicastRTCPPort"` + ServerKey string `yaml:"serverKey" json:"serverKey"` + ServerCert string `yaml:"serverCert" json:"serverCert"` + AuthMethods []string `yaml:"authMethods" json:"authMethods"` AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"` - ReadBufferSize int `yaml:"readBufferSize"` + ReadBufferSize int `yaml:"readBufferSize" json:"readBufferSize"` // rtmp - RTMPDisable bool `yaml:"rtmpDisable"` - RTMPAddress string `yaml:"rtmpAddress"` + RTMPDisable bool `yaml:"rtmpDisable" json:"rtmpDisable"` + RTMPAddress string `yaml:"rtmpAddress" json:"rtmpAddress"` // hls - HLSDisable bool `yaml:"hlsDisable"` - HLSAddress string `yaml:"hlsAddress"` - HLSAlwaysRemux bool `yaml:"hlsAlwaysRemux"` - HLSSegmentCount int `yaml:"hlsSegmentCount"` - HLSSegmentDuration time.Duration `yaml:"hlsSegmentDuration"` - HLSAllowOrigin string `yaml:"hlsAllowOrigin"` + HLSDisable bool `yaml:"hlsDisable" json:"hlsDisable"` + HLSAddress string `yaml:"hlsAddress" json:"hlsAddress"` + HLSAlwaysRemux bool `yaml:"hlsAlwaysRemux" json:"hlsAlwaysRemux"` + HLSSegmentCount int `yaml:"hlsSegmentCount" json:"hlsSegmentCount"` + HLSSegmentDuration time.Duration `yaml:"hlsSegmentDuration" json:"hlsSegmentDuration"` + HLSAllowOrigin string `yaml:"hlsAllowOrigin" json:"hlsAllowOrigin"` // paths - Paths map[string]*PathConf `yaml:"paths"` + Paths map[string]*PathConf `yaml:"paths" json:"paths"` } -func (conf *Conf) fillAndCheck() error { +// Load loads a Conf. +func Load(fpath string) (*Conf, bool, error) { + conf := &Conf{} + + // read from file + found, err := func() (bool, error) { + // rtsp-simple-server.yml is optional + if fpath == "rtsp-simple-server.yml" { + if _, err := os.Stat(fpath); err != nil { + return false, nil + } + } + + byts, err := ioutil.ReadFile(fpath) + if err != nil { + return true, err + } + + if key, ok := os.LookupEnv("RTSP_CONFKEY"); ok { + byts, err = decrypt(key, byts) + if err != nil { + return true, err + } + } + + err = yaml.Unmarshal(byts, conf) + if err != nil { + return true, err + } + + return true, nil + }() + if err != nil { + return nil, false, err + } + + // read from environment + err = confenv.Load("RTSP", conf) + if err != nil { + return nil, false, err + } + + err = conf.CheckAndFillMissing() + if err != nil { + return nil, false, err + } + + return conf, found, nil +} + +// CheckAndFillMissing checks the configuration for errors and fill missing fields. +func (conf *Conf) CheckAndFillMissing() error { if conf.LogLevel == "" { conf.LogLevel = "info" } @@ -158,6 +211,10 @@ func (conf *Conf) fillAndCheck() error { conf.ReadBufferCount = 512 } + if conf.APIAddress == "" { + conf.APIAddress = ":9997" + } + if conf.MetricsAddress == "" { conf.MetricsAddress = ":9998" } @@ -290,7 +347,7 @@ func (conf *Conf) fillAndCheck() error { pconf = conf.Paths[name] } - err := pconf.fillAndCheck(name) + err := pconf.checkAndFillMissing(name) if err != nil { return err } @@ -298,53 +355,3 @@ func (conf *Conf) fillAndCheck() error { return nil } - -// Load loads a Conf. -func Load(fpath string) (*Conf, bool, error) { - conf := &Conf{} - - // read from file - found, err := func() (bool, error) { - // rtsp-simple-server.yml is optional - if fpath == "rtsp-simple-server.yml" { - if _, err := os.Stat(fpath); err != nil { - return false, nil - } - } - - byts, err := ioutil.ReadFile(fpath) - if err != nil { - return true, err - } - - if key, ok := os.LookupEnv("RTSP_CONFKEY"); ok { - byts, err = decrypt(key, byts) - if err != nil { - return true, err - } - } - - err = yaml.Unmarshal(byts, conf) - if err != nil { - return true, err - } - - return true, nil - }() - if err != nil { - return nil, false, err - } - - // read from environment - err = confenv.Load("RTSP", conf) - if err != nil { - return nil, false, err - } - - err = conf.fillAndCheck() - if err != nil { - return nil, false, err - } - - return conf, found, nil -} diff --git a/internal/conf/path.go b/internal/conf/path.go index 35282059..5da8eb01 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -69,42 +69,42 @@ type PathConf struct { Regexp *regexp.Regexp `yaml:"-" json:"-"` // source - Source string `yaml:"source"` - SourceProtocol string `yaml:"sourceProtocol"` + Source string `yaml:"source" json:"source"` + SourceProtocol string `yaml:"sourceProtocol" json:"sourceProtocol"` SourceProtocolParsed *gortsplib.ClientProtocol `yaml:"-" json:"-"` - SourceAnyPortEnable bool `yaml:"sourceAnyPortEnable"` - SourceFingerprint string `yaml:"sourceFingerprint"` - SourceOnDemand bool `yaml:"sourceOnDemand"` - SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"` - SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"` - SourceRedirect string `yaml:"sourceRedirect"` - DisablePublisherOverride bool `yaml:"disablePublisherOverride"` - Fallback string `yaml:"fallback"` + SourceAnyPortEnable bool `yaml:"sourceAnyPortEnable" json:"sourceAnyPortEnable"` + SourceFingerprint string `yaml:"sourceFingerprint" json:"sourceFingerprint"` + SourceOnDemand bool `yaml:"sourceOnDemand" json:"sourceOnDemand"` + SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout" json:"sourceOnDemandStartTimeout"` + SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter" json:"sourceOnDemandCloseAfter"` + SourceRedirect string `yaml:"sourceRedirect" json:"sourceRedirect"` + DisablePublisherOverride bool `yaml:"disablePublisherOverride" json:"disablePublisherOverride"` + Fallback string `yaml:"fallback" json:"fallback"` // authentication - PublishUser string `yaml:"publishUser"` - PublishPass string `yaml:"publishPass"` - PublishIPs []string `yaml:"publishIPs"` + PublishUser string `yaml:"publishUser" json:"publishUser"` + PublishPass string `yaml:"publishPass" json:"publishPass"` + PublishIPs []string `yaml:"publishIPs" json:"publishIPs"` PublishIPsParsed []interface{} `yaml:"-" json:"-"` - ReadUser string `yaml:"readUser"` - ReadPass string `yaml:"readPass"` - ReadIPs []string `yaml:"readIPs"` + ReadUser string `yaml:"readUser" json:"readUser"` + ReadPass string `yaml:"readPass" json:"readPass"` + ReadIPs []string `yaml:"readIPs" json:"readIPs"` ReadIPsParsed []interface{} `yaml:"-" json:"-"` // custom commands - RunOnInit string `yaml:"runOnInit"` - RunOnInitRestart bool `yaml:"runOnInitRestart"` - RunOnDemand string `yaml:"runOnDemand"` - RunOnDemandRestart bool `yaml:"runOnDemandRestart"` - RunOnDemandStartTimeout time.Duration `yaml:"runOnDemandStartTimeout"` - RunOnDemandCloseAfter time.Duration `yaml:"runOnDemandCloseAfter"` - RunOnPublish string `yaml:"runOnPublish"` - RunOnPublishRestart bool `yaml:"runOnPublishRestart"` - RunOnRead string `yaml:"runOnRead"` - RunOnReadRestart bool `yaml:"runOnReadRestart"` + RunOnInit string `yaml:"runOnInit" json:"runOnInit"` + RunOnInitRestart bool `yaml:"runOnInitRestart" json:"runOnInitRestart"` + RunOnDemand string `yaml:"runOnDemand" json:"runOnDemand"` + RunOnDemandRestart bool `yaml:"runOnDemandRestart" json:"runOnDemandRestart"` + RunOnDemandStartTimeout time.Duration `yaml:"runOnDemandStartTimeout" json:"runOnDemandStartTimeout"` + RunOnDemandCloseAfter time.Duration `yaml:"runOnDemandCloseAfter" json:"runOnDemandCloseAfter"` + RunOnPublish string `yaml:"runOnPublish" json:"runOnPublish"` + RunOnPublishRestart bool `yaml:"runOnPublishRestart" json:"runOnPublishRestart"` + RunOnRead string `yaml:"runOnRead" json:"runOnRead"` + RunOnReadRestart bool `yaml:"runOnReadRestart" json:"runOnReadRestart"` } -func (pconf *PathConf) fillAndCheck(name string) error { +func (pconf *PathConf) checkAndFillMissing(name string) error { if name == "" { return fmt.Errorf("path name can not be empty") } diff --git a/internal/core/api.go b/internal/core/api.go new file mode 100644 index 00000000..5188b44f --- /dev/null +++ b/internal/core/api.go @@ -0,0 +1,543 @@ +package core + +import ( + "context" + "encoding/json" + "net" + "net/http" + "reflect" + "sync" + "time" + + "github.com/gin-gonic/gin" + + "github.com/aler9/rtsp-simple-server/internal/conf" + "github.com/aler9/rtsp-simple-server/internal/logger" +) + +func fillStruct(dest interface{}, source interface{}) { + rvsource := reflect.ValueOf(source) + rvdest := reflect.ValueOf(dest) + nf := rvsource.NumField() + for i := 0; i < nf; i++ { + fnew := rvsource.Field(i) + if !fnew.IsNil() { + f := rvdest.Elem().FieldByName(rvsource.Type().Field(i).Name) + if f.Kind() == reflect.Ptr { + f.Set(fnew) + } else { + f.Set(fnew.Elem()) + } + } + } +} + +func cloneStruct(dest interface{}, source interface{}) { + enc, _ := json.Marshal(dest) + _ = json.Unmarshal(enc, source) +} + +func loadConfData(ctx *gin.Context) (interface{}, error) { + var in struct { + // general + LogLevel *string `json:"logLevel"` + LogDestinations *[]string `json:"logDestinations"` + LogFile *string `json:"logFile"` + ReadTimeout *time.Duration `json:"readTimeout"` + WriteTimeout *time.Duration `json:"writeTimeout"` + ReadBufferCount *int `json:"readBufferCount"` + API *bool `json:"api"` + APIAddress *string `json:"apiAddress"` + Metrics *bool `json:"metrics"` + MetricsAddress *string `json:"metricsAddress"` + PPROF *bool `json:"pprof"` + PPROFAddress *string `json:"pprofAddress"` + RunOnConnect *string `json:"runOnConnect"` + RunOnConnectRestart *bool `json:"runOnConnectRestart"` + + // rtsp + RTSPDisable *bool `json:"rtspDisable"` + Protocols *[]string `json:"protocols"` + Encryption *string `json:"encryption"` + RTSPAddress *string `json:"rtspAddress"` + RTSPSAddress *string `json:"rtspsAddress"` + RTPAddress *string `json:"rtpAddress"` + RTCPAddress *string `json:"rtcpAddress"` + MulticastIPRange *string `json:"multicastIPRange"` + MulticastRTPPort *int `json:"multicastRTPPort"` + MulticastRTCPPort *int `json:"multicastRTCPPort"` + ServerKey *string `json:"serverKey"` + ServerCert *string `json:"serverCert"` + AuthMethods *[]string `json:"authMethods"` + ReadBufferSize *int `json:"readBufferSize"` + + // rtmp + RTMPDisable *bool `json:"rtmpDisable"` + RTMPAddress *string `json:"rtmpAddress"` + + // hls + HLSDisable *bool `json:"hlsDisable"` + HLSAddress *string `json:"hlsAddress"` + HLSAlwaysRemux *bool `json:"hlsAlwaysRemux"` + HLSSegmentCount *int `json:"hlsSegmentCount"` + HLSSegmentDuration *time.Duration `json:"hlsSegmentDuration"` + HLSAllowOrigin *string `json:"hlsAllowOrigin"` + } + err := json.NewDecoder(ctx.Request.Body).Decode(&in) + if err != nil { + return nil, err + } + + return in, err +} + +func loadConfPathData(ctx *gin.Context) (interface{}, error) { + var in struct { + // source + Source *string `json:"source"` + SourceProtocol *string `json:"sourceProtocol"` + SourceAnyPortEnable *bool `json:"sourceAnyPortEnable"` + SourceFingerprint *string `json:"sourceFingerprint"` + SourceOnDemand *bool `json:"sourceOnDemand"` + SourceOnDemandStartTimeout *time.Duration `json:"sourceOnDemandStartTimeout"` + SourceOnDemandCloseAfter *time.Duration `json:"sourceOnDemandCloseAfter"` + SourceRedirect *string `json:"sourceRedirect"` + DisablePublisherOverride *bool `json:"disablePublisherOverride"` + Fallback *string `json:"fallback"` + + // authentication + PublishUser *string `json:"publishUser"` + PublishPass *string `json:"publishPass"` + PublishIPs *[]string `json:"publishIPs"` + ReadUser *string `json:"readUser"` + ReadPass *string `json:"readPass"` + ReadIPs *[]string `json:"readIPs"` + + // custom commands + RunOnInit *string `json:"runOnInit"` + RunOnInitRestart *bool `json:"runOnInitRestart"` + RunOnDemand *string `json:"runOnDemand"` + RunOnDemandRestart *bool `json:"runOnDemandRestart"` + RunOnDemandStartTimeout *time.Duration `json:"runOnDemandStartTimeout"` + RunOnDemandCloseAfter *time.Duration `json:"runOnDemandCloseAfter"` + RunOnPublish *string `json:"runOnPublish"` + RunOnPublishRestart *bool `json:"runOnPublishRestart"` + RunOnRead *string `json:"runOnRead"` + RunOnReadRestart *bool `json:"runOnReadRestart"` + } + err := json.NewDecoder(ctx.Request.Body).Decode(&in) + if err != nil { + return nil, err + } + + return in, err +} + +type apiPathsItem struct { + Name string `json:"name"` + ConfName string `json:"confName"` + Conf *conf.PathConf `json:"conf"` + Source interface{} `json:"source"` + SourceReady bool `json:"sourceReady"` + Readers []interface{} `json:"readers"` +} + +type apiPathsListData struct { + Items []apiPathsItem `json:"items"` +} + +type apiPathsListRes1 struct { + Paths map[string]*path + Err error +} + +type apiPathsListReq1 struct { + Res chan apiPathsListRes1 +} + +type apiPathsListRes2 struct { + Err error +} + +type apiPathsListReq2 struct { + Data *apiPathsListData + Res chan apiPathsListRes2 +} + +type apiRTSPSessionsItem struct { + ID string `json:"id"` + RemoteAddr string `json:"remoteAddr"` +} + +type apiRTSPSessionsListData struct { + Items []apiRTSPSessionsItem `json:"items"` +} + +type apiRTSPSessionsListRes struct { + Err error +} + +type apiRTSPSessionsListReq struct { + Data *apiRTSPSessionsListData +} + +type apiRTSPSessionsKickRes struct { + Err error +} + +type apiRTSPSessionsKickReq struct { + ID string +} + +type apiRTMPConnsListItem struct { + ID string `json:"id"` + RemoteAddr string `json:"remoteAddr"` +} + +type apiRTMPConnsListData struct { + Items []apiRTMPConnsListItem `json:"items"` +} + +type apiRTMPConnsListRes struct { + Err error +} + +type apiRTMPConnsListReq struct { + Data *apiRTMPConnsListData + Res chan apiRTMPConnsListRes +} + +type apiRTMPConnsKickRes struct { + Err error +} + +type apiRTMPConnsKickReq struct { + ID string + Res chan apiRTMPConnsKickRes +} + +type apiParent interface { + Log(logger.Level, string, ...interface{}) + OnAPIConfigSet(conf *conf.Conf) +} + +type api struct { + conf *conf.Conf + pathManager *pathManager + rtspServer *rtspServer + rtspsServer *rtspServer + rtmpServer *rtmpServer + parent apiParent + + mutex sync.Mutex + s *http.Server +} + +func newAPI( + address string, + conf *conf.Conf, + pathManager *pathManager, + rtspServer *rtspServer, + rtspsServer *rtspServer, + rtmpServer *rtmpServer, + parent apiParent, +) (*api, error) { + ln, err := net.Listen("tcp", address) + if err != nil { + return nil, err + } + + a := &api{ + conf: conf, + pathManager: pathManager, + rtspServer: rtspServer, + rtspsServer: rtspsServer, + rtmpServer: rtmpServer, + parent: parent, + } + + gin.SetMode(gin.ReleaseMode) + router := gin.New() + + router.GET("/config/get", a.onConfigGet) + router.POST("/config/set", a.onConfigSet) + router.POST("/config/paths/add/:name", a.onConfigPathsAdd) + router.POST("/config/paths/edit/:name", a.onConfigPathsEdit) + router.POST("/config/paths/delete/:name", a.onConfigPathsDelete) + router.GET("/paths/list", a.onPathsList) + router.GET("/rtspsessions/list", a.onRTSPSessionsList) + router.POST("/rtspsessions/kick/:id", a.onRTSPSessionsKick) + router.GET("/rtmpconns/list", a.onRTMPConnsList) + router.POST("/rtmpconns/kick/:id", a.onRTMPConnsKick) + + a.s = &http.Server{ + Handler: router, + } + + go a.s.Serve(ln) + + a.log(logger.Info, "listener opened on "+address) + + return a, nil +} + +func (a *api) close() { + a.s.Shutdown(context.Background()) + a.log(logger.Info, "closed") +} + +// Log is the main logging function. +func (a *api) log(level logger.Level, format string, args ...interface{}) { + a.parent.Log(level, "[API] "+format, args...) +} + +func (a *api) onConfigGet(ctx *gin.Context) { + a.mutex.Lock() + c := a.conf + a.mutex.Unlock() + + ctx.JSON(http.StatusOK, c) +} + +func (a *api) onConfigSet(ctx *gin.Context) { + in, err := loadConfData(ctx) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + a.mutex.Lock() + var newConf conf.Conf + cloneStruct(a.conf, &newConf) + a.mutex.Unlock() + + fillStruct(&newConf, in) + + err = newConf.CheckAndFillMissing() + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + // since reloading the configuration can cause the shutdown of the API, + // call it in a goroutine + go a.parent.OnAPIConfigSet(&newConf) + + ctx.Status(http.StatusOK) +} + +func (a *api) onConfigPathsAdd(ctx *gin.Context) { + in, err := loadConfPathData(ctx) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + name := ctx.Param("name") + + a.mutex.Lock() + var newConf conf.Conf + cloneStruct(a.conf, &newConf) + a.mutex.Unlock() + + if _, ok := newConf.Paths[name]; ok { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + newConfPath := &conf.PathConf{} + + fillStruct(newConfPath, in) + newConf.Paths[name] = newConfPath + + err = newConf.CheckAndFillMissing() + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + // since reloading the configuration can cause the shutdown of the API, + // call it in a goroutine + go a.parent.OnAPIConfigSet(&newConf) + + ctx.Status(http.StatusOK) +} + +func (a *api) onConfigPathsEdit(ctx *gin.Context) { + in, err := loadConfPathData(ctx) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + name := ctx.Param("name") + + a.mutex.Lock() + var newConf conf.Conf + cloneStruct(a.conf, &newConf) + a.mutex.Unlock() + + newConfPath, ok := newConf.Paths[name] + if !ok { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + fillStruct(newConfPath, in) + + err = newConf.CheckAndFillMissing() + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + // since reloading the configuration can cause the shutdown of the API, + // call it in a goroutine + go a.parent.OnAPIConfigSet(&newConf) + + ctx.Status(http.StatusOK) +} + +func (a *api) onConfigPathsDelete(ctx *gin.Context) { + name := ctx.Param("name") + + a.mutex.Lock() + var newConf conf.Conf + cloneStruct(a.conf, &newConf) + a.mutex.Unlock() + + if _, ok := newConf.Paths[name]; !ok { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + delete(newConf.Paths, name) + + err := newConf.CheckAndFillMissing() + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + // since reloading the configuration can cause the shutdown of the API, + // call it in a goroutine + go a.parent.OnAPIConfigSet(&newConf) + + ctx.Status(http.StatusOK) +} + +func (a *api) onPathsList(ctx *gin.Context) { + data := apiPathsListData{ + Items: []apiPathsItem{}, + } + + res := a.pathManager.OnAPIPathsList(apiPathsListReq1{}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + for _, pa := range res.Paths { + pa.OnAPIPathsList(apiPathsListReq2{Data: &data}) + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPSessionsList(ctx *gin.Context) { + if a.rtspServer == nil && a.rtspsServer == nil { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + data := apiRTSPSessionsListData{ + Items: []apiRTSPSessionsItem{}, + } + + if a.rtspServer != nil { + res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + } + + if a.rtspsServer != nil { + res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPSessionsKick(ctx *gin.Context) { + if a.rtspServer == nil && a.rtspsServer == nil { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + id := ctx.Param("id") + + if a.rtspServer != nil { + res := a.rtspServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) + if res.Err == nil { + ctx.Status(http.StatusOK) + return + } + } + + if a.rtspsServer != nil { + res := a.rtspsServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) + if res.Err != nil { + ctx.Status(http.StatusOK) + return + } + } + + ctx.AbortWithStatus(http.StatusNotFound) +} + +func (a *api) onRTMPConnsList(ctx *gin.Context) { + if a.rtmpServer == nil { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + data := apiRTMPConnsListData{ + Items: []apiRTMPConnsListItem{}, + } + + res := a.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{Data: &data}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + ctx.JSON(http.StatusOK, data) +} + +// OnConfReload is called by core. +func (a *api) OnConfReload(conf *conf.Conf) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.conf = conf +} + +func (a *api) onRTMPConnsKick(ctx *gin.Context) { + if a.rtmpServer == nil { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + id := ctx.Param("id") + + res := a.rtmpServer.OnAPIRTMPConnsKick(apiRTMPConnsKickReq{ID: id}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + ctx.Status(http.StatusOK) +} diff --git a/internal/core/core.go b/internal/core/core.go index b94931f2..22c894ec 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -19,21 +19,25 @@ var version = "v0.0.0" // Core is an instance of rtsp-simple-server. type Core struct { - ctx context.Context - ctxCancel func() - confPath string - conf *conf.Conf - confFound bool - stats *stats - logger *logger.Logger - metrics *metrics - pprof *pprof - pathManager *pathManager - rtspServerPlain *rtspServer - rtspServerTLS *rtspServer - rtmpServer *rtmpServer - hlsServer *hlsServer - confWatcher *confwatcher.ConfWatcher + ctx context.Context + ctxCancel func() + confPath string + conf *conf.Conf + confFound bool + stats *stats + logger *logger.Logger + metrics *metrics + pprof *pprof + pathManager *pathManager + rtspServer *rtspServer + rtspsServer *rtspServer + rtmpServer *rtmpServer + hlsServer *hlsServer + api *api + confWatcher *confwatcher.ConfWatcher + + // in + apiConfigSet chan *conf.Conf // out done chan struct{} @@ -62,10 +66,11 @@ func New(args []string) (*Core, bool) { ctx, ctxCancel := context.WithCancel(context.Background()) p := &Core{ - ctx: ctx, - ctxCancel: ctxCancel, - confPath: *argConfPath, - done: make(chan struct{}), + ctx: ctx, + ctxCancel: ctxCancel, + confPath: *argConfPath, + apiConfigSet: make(chan *conf.Conf), + done: make(chan struct{}), } var err error @@ -130,7 +135,24 @@ outer: for { select { case <-confChanged: - err := p.reloadConf() + p.Log(logger.Info, "reloading configuration (file changed)") + + newConf, _, err := conf.Load(p.confPath) + if err != nil { + p.Log(logger.Info, "ERR: %s", err) + break outer + } + + err = p.reloadConf(newConf) + if err != nil { + p.Log(logger.Info, "ERR: %s", err) + break outer + } + + case newConf := <-p.apiConfigSet: + p.Log(logger.Info, "reloading configuration (API request)") + + err := p.reloadConf(newConf) if err != nil { p.Log(logger.Info, "ERR: %s", err) break outer @@ -213,10 +235,10 @@ func (p *Core) createResources(initial bool) error { if !p.conf.RTSPDisable && (p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional) { - if p.rtspServerPlain == nil { + if p.rtspServer == nil { _, useUDP := p.conf.ProtocolsParsed[conf.ProtocolUDP] _, useMulticast := p.conf.ProtocolsParsed[conf.ProtocolMulticast] - p.rtspServerPlain, err = newRTSPServer( + p.rtspServer, err = newRTSPServer( p.ctx, p.conf.RTSPAddress, p.conf.AuthMethodsParsed, @@ -250,8 +272,8 @@ func (p *Core) createResources(initial bool) error { if !p.conf.RTSPDisable && (p.conf.EncryptionParsed == conf.EncryptionStrict || p.conf.EncryptionParsed == conf.EncryptionOptional) { - if p.rtspServerTLS == nil { - p.rtspServerTLS, err = newRTSPServer( + if p.rtspsServer == nil { + p.rtspsServer, err = newRTSPServer( p.ctx, p.conf.RTSPSAddress, p.conf.AuthMethodsParsed, @@ -321,6 +343,22 @@ func (p *Core) createResources(initial bool) error { } } + if p.conf.API { + if p.api == nil { + p.api, err = newAPI( + p.conf.APIAddress, + p.conf, + p.pathManager, + p.rtspServer, + p.rtspsServer, + p.rtmpServer, + p) + if err != nil { + return err + } + } + } + return nil } @@ -366,7 +404,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { p.pathManager.OnConfReload(newConf.Paths) } - closeServerPlain := false + closeRTSPServer := false if newConf == nil || newConf.RTSPDisable != p.conf.RTSPDisable || newConf.EncryptionParsed != p.conf.EncryptionParsed || @@ -387,10 +425,10 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeStats || closePathManager { - closeServerPlain = true + closeRTSPServer = true } - closeServerTLS := false + closeRTSPSServer := false if newConf == nil || newConf.RTSPDisable != p.conf.RTSPDisable || newConf.EncryptionParsed != p.conf.EncryptionParsed || @@ -407,10 +445,10 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeStats || closePathManager { - closeServerTLS = true + closeRTSPSServer = true } - closeServerRTMP := false + closeRTMPServer := false if newConf == nil || newConf.RTMPDisable != p.conf.RTMPDisable || newConf.RTMPAddress != p.conf.RTMPAddress || @@ -422,10 +460,10 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeStats || closePathManager { - closeServerRTMP = true + closeRTMPServer = true } - closeServerHLS := false + closeHLSServer := false if newConf == nil || newConf.HLSDisable != p.conf.HLSDisable || newConf.HLSAddress != p.conf.HLSAddress || @@ -436,17 +474,37 @@ func (p *Core) closeResources(newConf *conf.Conf) { newConf.ReadBufferCount != p.conf.ReadBufferCount || closeStats || closePathManager { - closeServerHLS = true + closeHLSServer = true + } + + closeAPI := false + if newConf == nil || + newConf.API != p.conf.API || + newConf.APIAddress != p.conf.APIAddress || + closePathManager || + closeRTSPServer || + closeRTSPSServer || + closeRTMPServer { + closeAPI = true + } + + if p.api != nil { + if closeAPI { + p.api.close() + p.api = nil + } else { + p.api.OnConfReload(newConf) + } } - if closeServerTLS && p.rtspServerTLS != nil { - p.rtspServerTLS.close() - p.rtspServerTLS = nil + if closeRTSPSServer && p.rtspsServer != nil { + p.rtspsServer.close() + p.rtspsServer = nil } - if closeServerPlain && p.rtspServerPlain != nil { - p.rtspServerPlain.close() - p.rtspServerPlain = nil + if closeRTSPServer && p.rtspServer != nil { + p.rtspServer.close() + p.rtspServer = nil } if closePathManager && p.pathManager != nil { @@ -454,12 +512,12 @@ func (p *Core) closeResources(newConf *conf.Conf) { p.pathManager = nil } - if closeServerHLS && p.hlsServer != nil { + if closeHLSServer && p.hlsServer != nil { p.hlsServer.close() p.hlsServer = nil } - if closeServerRTMP && p.rtmpServer != nil { + if closeRTMPServer && p.rtmpServer != nil { p.rtmpServer.close() p.rtmpServer = nil } @@ -484,16 +542,17 @@ func (p *Core) closeResources(newConf *conf.Conf) { } } -func (p *Core) reloadConf() error { - p.Log(logger.Info, "reloading configuration") - - newConf, _, err := conf.Load(p.confPath) - if err != nil { - return err - } - +func (p *Core) reloadConf(newConf *conf.Conf) error { p.closeResources(newConf) p.conf = newConf return p.createResources(false) } + +// OnAPIConfigSet is called by api. +func (p *Core) OnAPIConfigSet(conf *conf.Conf) { + select { + case p.apiConfigSet <- conf: + case <-p.ctx.Done(): + } +} diff --git a/internal/core/hls_remuxer.go b/internal/core/hls_remuxer.go index 44ea80c4..5edada55 100644 --- a/internal/core/hls_remuxer.go +++ b/internal/core/hls_remuxer.go @@ -495,3 +495,10 @@ func (r *hlsRemuxer) OnReaderFrame(trackID int, streamType gortsplib.StreamType, r.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload}) } } + +// OnReaderAPIDescribe implements reader. +func (r *hlsRemuxer) OnReaderAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"hlsremuxer"} +} diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 03d09e24..cbf7b3ce 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -94,6 +94,7 @@ func (s *hlsServer) Log(level logger.Level, format string, args ...interface{}) func (s *hlsServer) close() { s.ctxCancel() s.wg.Wait() + s.Log(logger.Info, "closed") } func (s *hlsServer) run() { diff --git a/internal/core/path.go b/internal/core/path.go index efd93216..1ecba5a6 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -63,7 +63,12 @@ type pathRTSPSession interface { type sourceRedirect struct{} -func (*sourceRedirect) IsSource() {} +// OnSourceAPIDescribe implements source. +func (*sourceRedirect) OnSourceAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"redirect"} +} type pathReaderState int @@ -241,6 +246,7 @@ type path struct { readerSetupPlay chan pathReaderSetupPlayReq readerPlay chan pathReaderPlayReq readerPause chan pathReaderPauseReq + apiPathsList chan apiPathsListReq2 } func newPath( @@ -287,8 +293,11 @@ func newPath( readerSetupPlay: make(chan pathReaderSetupPlayReq), readerPlay: make(chan pathReaderPlayReq), readerPause: make(chan pathReaderPauseReq), + apiPathsList: make(chan apiPathsListReq2), } + pa.Log(logger.Info, "created") + pa.wg.Add(1) go pa.run() @@ -297,6 +306,7 @@ func newPath( func (pa *path) Close() { pa.ctxCancel() + pa.Log(logger.Info, "destroyed") } // Log is the main logging function. @@ -415,6 +425,28 @@ outer: case req := <-pa.readerPause: pa.onReaderPause(req) + case req := <-pa.apiPathsList: + req.Data.Items = append(req.Data.Items, apiPathsItem{ + Name: pa.name, + ConfName: pa.confName, + Conf: pa.conf, + Source: func() interface{} { + if pa.source == nil { + return nil + } + return pa.source.OnSourceAPIDescribe() + }(), + SourceReady: pa.sourceReady, + Readers: func() []interface{} { + ret := []interface{}{} + for r := range pa.readers { + ret = append(ret, r.OnReaderAPIDescribe()) + } + return ret + }(), + }) + req.Res <- apiPathsListRes2{} + case <-pa.ctx.Done(): break outer } @@ -940,3 +972,14 @@ func (pa *path) OnSourceFrame(trackID int, streamType gortsplib.StreamType, payl // forward to non-RTSP readers pa.nonRTSPReaders.forwardFrame(trackID, streamType, payload) } + +// OnAPIPathsList is called by api. +func (pa *path) OnAPIPathsList(req apiPathsListReq2) apiPathsListRes2 { + req.Res = make(chan apiPathsListRes2) + select { + case pa.apiPathsList <- req: + return <-req.Res + case <-pa.ctx.Done(): + return apiPathsListRes2{Err: fmt.Errorf("terminated")} + } +} diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 5f9098f3..d59a508a 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -41,6 +41,7 @@ type pathManager struct { readerSetupPlay chan pathReaderSetupPlayReq publisherAnnounce chan pathPublisherAnnounceReq hlsServerSet chan *hlsServer + apiPathsList chan apiPathsListReq1 } func newPathManager( @@ -74,6 +75,7 @@ func newPathManager( readerSetupPlay: make(chan pathReaderSetupPlayReq), publisherAnnounce: make(chan pathPublisherAnnounceReq), hlsServerSet: make(chan *hlsServer), + apiPathsList: make(chan apiPathsListReq1), } for pathName, pathConf := range pm.pathConfs { @@ -238,6 +240,17 @@ outer: case s := <-pm.hlsServerSet: pm.hlsServer = s + case req := <-pm.apiPathsList: + paths := make(map[string]*path) + + for name, pa := range pm.paths { + paths[name] = pa + } + + req.Res <- apiPathsListRes1{ + Paths: paths, + } + case <-pm.ctx.Done(): break outer } @@ -396,3 +409,14 @@ func (pm *pathManager) OnHLSServer(s *hlsServer) { case <-pm.ctx.Done(): } } + +// OnAPIPathsList is called by api. +func (pm *pathManager) OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 { + req.Res = make(chan apiPathsListRes1) + select { + case pm.apiPathsList <- req: + return <-req.Res + case <-pm.ctx.Done(): + return apiPathsListRes1{Err: fmt.Errorf("terminated")} + } +} diff --git a/internal/core/reader.go b/internal/core/reader.go index e85d10dd..12baf45a 100644 --- a/internal/core/reader.go +++ b/internal/core/reader.go @@ -9,4 +9,5 @@ type reader interface { Close() OnReaderAccepted() OnReaderFrame(int, gortsplib.StreamType, []byte) + OnReaderAPIDescribe() interface{} } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 014eefd4..70af1350 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -53,6 +53,7 @@ type rtmpConnParent interface { } type rtmpConn struct { + id string rtspAddress string readTimeout time.Duration writeTimeout time.Duration @@ -73,6 +74,7 @@ type rtmpConn struct { func newRTMPConn( parentCtx context.Context, + id string, rtspAddress string, readTimeout time.Duration, writeTimeout time.Duration, @@ -87,6 +89,7 @@ func newRTMPConn( ctx, ctxCancel := context.WithCancel(parentCtx) c := &rtmpConn{ + id: id, rtspAddress: rtspAddress, readTimeout: readTimeout, writeTimeout: writeTimeout, @@ -120,8 +123,15 @@ func (c *rtmpConn) Close() { c.ctxCancel() } -// IsSource implements source. -func (c *rtmpConn) IsSource() {} +// ID returns the ID of the Conn. +func (c *rtmpConn) ID() string { + return c.id +} + +// RemoteAddr returns the remote address of the Conn. +func (c *rtmpConn) RemoteAddr() net.Addr { + return c.conn.NetConn().RemoteAddr() +} func (c *rtmpConn) log(level logger.Level, format string, args ...interface{}) { c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) @@ -517,6 +527,22 @@ func (c *rtmpConn) OnReaderFrame(trackID int, streamType gortsplib.StreamType, p } } +// OnReaderAPIDescribe implements reader. +func (c *rtmpConn) OnReaderAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + ID string `json:"id"` + }{"rtmpconn", c.id} +} + +// OnSourceAPIDescribe implements source. +func (c *rtmpConn) OnSourceAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + ID string `json:"id"` + }{"rtmpconn", c.id} +} + // OnPublisherAccepted implements publisher. func (c *rtmpConn) OnPublisherAccepted(tracksLen int) { c.log(logger.Info, "is publishing to path '%s', %d %s", diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index a9d0ece2..55300ed2 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -2,7 +2,11 @@ package core import ( "context" + "crypto/rand" + "encoding/binary" + "fmt" "net" + "strconv" "sync" "time" @@ -31,7 +35,9 @@ type rtmpServer struct { conns map[*rtmpConn]struct{} // in - connClose chan *rtmpConn + connClose chan *rtmpConn + apiRTMPConnsList chan apiRTMPConnsListReq + apiRTMPConnsKick chan apiRTMPConnsKickReq } func newRTMPServer( @@ -68,6 +74,8 @@ func newRTMPServer( l: l, conns: make(map[*rtmpConn]struct{}), connClose: make(chan *rtmpConn), + apiRTMPConnsList: make(chan apiRTMPConnsListReq), + apiRTMPConnsKick: make(chan apiRTMPConnsKickReq), } s.Log(logger.Info, "listener opened on %s", address) @@ -85,6 +93,7 @@ func (s *rtmpServer) Log(level logger.Level, format string, args ...interface{}) func (s *rtmpServer) close() { s.ctxCancel() s.wg.Wait() + s.Log(logger.Info, "closed") } func (s *rtmpServer) run() { @@ -124,8 +133,11 @@ outer: break outer case nconn := <-connNew: + id, _ := s.newConnID() + c := newRTMPConn( s.ctx, + id, s.rtspAddress, s.readTimeout, s.writeTimeout, @@ -145,6 +157,31 @@ outer: } s.doConnClose(c) + case req := <-s.apiRTMPConnsList: + for c := range s.conns { + req.Data.Items = append(req.Data.Items, apiRTMPConnsListItem{ + ID: c.ID(), + RemoteAddr: c.RemoteAddr().String(), + }) + } + req.Res <- apiRTMPConnsListRes{} + + case req := <-s.apiRTMPConnsKick: + res := func() bool { + for c := range s.conns { + if c.ID() == req.ID { + c.Close() + return true + } + } + return false + }() + if res { + req.Res <- apiRTMPConnsKickRes{} + } else { + req.Res <- apiRTMPConnsKickRes{fmt.Errorf("not found")} + } + case <-s.ctx.Done(): break outer } @@ -159,6 +196,34 @@ outer: } } +func (s *rtmpServer) newConnID() (string, error) { + for { + b := make([]byte, 4) + _, err := rand.Read(b) + if err != nil { + return "", err + } + + u := binary.LittleEndian.Uint32(b) + u %= 899999999 + u += 100000000 + + id := strconv.FormatUint(uint64(u), 10) + + alreadyPresent := func() bool { + for c := range s.conns { + if c.ID() == id { + return true + } + } + return false + }() + if !alreadyPresent { + return id, nil + } + } +} + func (s *rtmpServer) doConnClose(c *rtmpConn) { delete(s.conns, c) c.ParentClose() @@ -172,3 +237,25 @@ func (s *rtmpServer) OnConnClose(c *rtmpConn) { case <-s.ctx.Done(): } } + +// OnAPIRTMPConnsList is called by api. +func (s *rtmpServer) OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes { + req.Res = make(chan apiRTMPConnsListRes) + select { + case s.apiRTMPConnsList <- req: + return <-req.Res + case <-s.ctx.Done(): + return apiRTMPConnsListRes{Err: fmt.Errorf("terminated")} + } +} + +// OnAPIRTMPConnsKick is called by api. +func (s *rtmpServer) OnAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes { + req.Res = make(chan apiRTMPConnsKickRes) + select { + case s.apiRTMPConnsKick <- req: + return <-req.Res + case <-s.ctx.Done(): + return apiRTMPConnsKickRes{Err: fmt.Errorf("terminated")} + } +} diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 61df3e20..39d7f888 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -78,12 +78,6 @@ func (s *rtmpSource) Close() { s.ctxCancel() } -// IsSource implements source. -func (s *rtmpSource) IsSource() {} - -// IsSourceStatic implements sourceStatic. -func (s *rtmpSource) IsSourceStatic() {} - func (s *rtmpSource) log(level logger.Level, format string, args ...interface{}) { s.parent.Log(level, "[rtmp source] "+format, args...) } @@ -269,3 +263,10 @@ func (s *rtmpSource) runInner() bool { return false } } + +// OnSourceAPIDescribe implements source. +func (*rtmpSource) OnSourceAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"rtmpSource"} +} diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index 4646ebc8..1a1305ca 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "crypto/tls" "encoding/binary" + "fmt" "strconv" "sync" "time" @@ -17,30 +18,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) -func newSessionVisualID(sessions map[*gortsplib.ServerSession]*rtspSession) (string, error) { - for { - b := make([]byte, 4) - _, err := rand.Read(b) - if err != nil { - return "", err - } - - id := strconv.FormatUint(uint64(binary.LittleEndian.Uint32(b)), 10) - - alreadyPresent := func() bool { - for _, s := range sessions { - if s.VisualID() == id { - return true - } - } - return false - }() - if !alreadyPresent { - return id, nil - } - } -} - type rtspServerParent interface { Log(logger.Level, string, ...interface{}) } @@ -170,6 +147,7 @@ func (s *rtspServer) Log(level logger.Level, format string, args ...interface{}) func (s *rtspServer) close() { s.ctxCancel() s.wg.Wait() + s.Log(logger.Info, "closed") } func (s *rtspServer) run() { @@ -203,6 +181,34 @@ outer: s.srv.Close() } +func (s *rtspServer) newSessionID() (string, error) { + for { + b := make([]byte, 4) + _, err := rand.Read(b) + if err != nil { + return "", err + } + + u := binary.LittleEndian.Uint32(b) + u %= 899999999 + u += 100000000 + + id := strconv.FormatUint(uint64(u), 10) + + alreadyPresent := func() bool { + for _, s := range s.sessions { + if s.ID() == id { + return true + } + } + return false + }() + if !alreadyPresent { + return id, nil + } + } +} + // OnConnOpen implements gortsplib.ServerHandlerOnConnOpen. func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { c := newRTSPConn( @@ -253,14 +259,12 @@ func (s *rtspServer) OnResponse(sc *gortsplib.ServerConn, res *base.Response) { func (s *rtspServer) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { s.mutex.Lock() - // do not use ss.ID() in logs, since it allows to take ownership of a session - // use a new random ID - visualID, _ := newSessionVisualID(s.sessions) + id, _ := s.newSessionID() se := newRTSPSession( s.rtspAddress, s.protocols, - visualID, + id, ctx.Session, ctx.Conn, s.pathManager, @@ -335,5 +339,46 @@ func (s *rtspServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { s.mutex.RLock() se := s.sessions[ctx.Session] s.mutex.RUnlock() - se.OnIncomingFrame(ctx) + se.OnFrame(ctx) +} + +// OnAPIRTSPSessionsList is called by api. +func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes { + select { + case <-s.ctx.Done(): + return apiRTSPSessionsListRes{Err: fmt.Errorf("terminated")} + default: + } + + s.mutex.RLock() + for _, s := range s.sessions { + req.Data.Items = append(req.Data.Items, apiRTSPSessionsItem{ + ID: s.ID(), + RemoteAddr: s.RemoteAddr().String(), + }) + } + s.mutex.RUnlock() + + return apiRTSPSessionsListRes{} +} + +// OnAPIRTSPSessionsKick is called by api. +func (s *rtspServer) OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes { + select { + case <-s.ctx.Done(): + return apiRTSPSessionsKickRes{Err: fmt.Errorf("terminated")} + default: + } + + s.mutex.RLock() + defer s.mutex.RUnlock() + + for _, s := range s.sessions { + if s.ID() == req.ID { + s.Close() + return apiRTSPSessionsKickRes{} + } + } + + return apiRTSPSessionsKickRes{Err: fmt.Errorf("not found")} } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 417c24c1..08a89f86 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -25,8 +25,9 @@ type rtspSessionParent interface { type rtspSession struct { rtspAddress string protocols map[conf.Protocol]struct{} - visualID string + id string ss *gortsplib.ServerSession + author *gortsplib.ServerConn pathManager *pathManager parent rtspSessionParent @@ -38,7 +39,7 @@ type rtspSession struct { func newRTSPSession( rtspAddress string, protocols map[conf.Protocol]struct{}, - visualID string, + id string, ss *gortsplib.ServerSession, sc *gortsplib.ServerConn, pathManager *pathManager, @@ -46,13 +47,14 @@ func newRTSPSession( s := &rtspSession{ rtspAddress: rtspAddress, protocols: protocols, - visualID: visualID, + id: id, ss: ss, + author: sc, pathManager: pathManager, parent: parent, } - s.log(logger.Info, "opened by %v", sc.NetConn().RemoteAddr()) + s.log(logger.Info, "opened by %v", s.author.NetConn().RemoteAddr()) return s } @@ -83,15 +85,17 @@ func (s *rtspSession) Close() { s.ss.Close() } -// IsSource implements source. -func (s *rtspSession) IsSource() {} - // IsRTSPSession implements pathRTSPSession. func (s *rtspSession) IsRTSPSession() {} -// VisualID returns the visual ID of the session. -func (s *rtspSession) VisualID() string { - return s.visualID +// ID returns the public ID of the session. +func (s *rtspSession) ID() string { + return s.id +} + +// RemoteAddr returns the remote address of the author of the session. +func (s *rtspSession) RemoteAddr() net.Addr { + return s.author.NetConn().RemoteAddr() } func (s *rtspSession) displayedProtocol() string { @@ -102,7 +106,7 @@ func (s *rtspSession) displayedProtocol() string { } func (s *rtspSession) log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...) + s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.id}, args...)...) } // OnAnnounce is called by rtspServer. @@ -298,6 +302,22 @@ func (s *rtspSession) OnReaderFrame(trackID int, streamType gortsplib.StreamType s.ss.WriteFrame(trackID, streamType, payload) } +// OnReaderAPIDescribe implements reader. +func (s *rtspSession) OnReaderAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + ID string `json:"id"` + }{"rtspsession", s.id} +} + +// OnSourceAPIDescribe implements source. +func (s *rtspSession) OnSourceAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + ID string `json:"id"` + }{"rtspsession", s.id} +} + // OnPublisherAccepted implements publisher. func (s *rtspSession) OnPublisherAccepted(tracksLen int) { s.log(logger.Info, "is publishing to path '%s', %d %s with %s", @@ -312,8 +332,8 @@ func (s *rtspSession) OnPublisherAccepted(tracksLen int) { s.displayedProtocol()) } -// OnIncomingFrame is called by rtspServer. -func (s *rtspSession) OnIncomingFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { +// OnFrame is called by rtspServer. +func (s *rtspSession) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { if s.ss.State() != gortsplib.ServerSessionStateRecord { return } diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 8efa2909..100b2fee 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -91,12 +91,6 @@ func (s *rtspSource) Close() { s.ctxCancel() } -// IsSource implements source. -func (s *rtspSource) IsSource() {} - -// IsSourceStatic implements sourceStatic. -func (s *rtspSource) IsSourceStatic() {} - func (s *rtspSource) log(level logger.Level, format string, args ...interface{}) { s.parent.Log(level, "[rtsp source] "+format, args...) } @@ -214,3 +208,10 @@ func (s *rtspSource) runInner() bool { return true } } + +// OnSourceAPIDescribe implements source. +func (*rtspSource) OnSourceAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"rtspSource"} +} diff --git a/internal/core/source.go b/internal/core/source.go index efc2cbc9..b4a275f4 100644 --- a/internal/core/source.go +++ b/internal/core/source.go @@ -2,12 +2,11 @@ package core // source is an entity that can provide a stream, statically or dynamically. type source interface { - IsSource() + OnSourceAPIDescribe() interface{} } // sourceStatic is an entity that can provide a static stream. type sourceStatic interface { source - IsSourceStatic() Close() } diff --git a/internal/externalcmd/cmd.go b/internal/externalcmd/cmd.go index 8d64504d..cebbafbf 100644 --- a/internal/externalcmd/cmd.go +++ b/internal/externalcmd/cmd.go @@ -5,7 +5,7 @@ import ( ) const ( - retryPause = 5 * time.Second + restartPause = 5 * time.Second ) // Environment is a Cmd environment. @@ -64,7 +64,7 @@ func (e *Cmd) run() { } select { - case <-time.After(retryPause): + case <-time.After(restartPause): return true case <-e.terminate: return false diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index c20f98e4..48e07762 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -18,6 +18,11 @@ writeTimeout: 10s # a lower number allows to save RAM. readBufferCount: 512 +# enable the HTTP API. +api: no +# address of the API listener. +apiAddress: :9997 + # enable Prometheus-compatible metrics. metrics: no # address of the metrics listener.