Browse Source

implement log levels; print requests and responses when log level is "debug" (#116)

pull/169/head
aler9 5 years ago
parent
commit
74b592b211
  1. 2
      go.mod
  2. 4
      go.sum
  3. 84
      internal/client/client.go
  4. 7
      internal/clientman/clientman.go
  5. 72
      internal/conf/conf.go
  6. 160
      internal/logger/logger.go
  7. 93
      internal/loghandler/loghandler.go
  8. 5
      internal/metrics/metrics.go
  9. 17
      internal/path/path.go
  10. 7
      internal/pathman/pathman.go
  11. 6
      internal/pprof/pprof.go
  12. 6
      internal/servertcp/server.go
  13. 5
      internal/serverudp/server.go
  14. 31
      internal/sourcertmp/source.go
  15. 30
      internal/sourcertsp/source.go
  16. 55
      main.go
  17. 72
      main_test.go
  18. 4
      rtsp-simple-server.yml

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20201206210440-3631309f9fc3
github.com/aler9/gortsplib v0.0.0-20201208105438-07aefbcd5d11
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20201206210440-3631309f9fc3 h1:ord/MYU8Re5dW06oqggQjC0AYrzfGhycWDr8WtJq0H8=
github.com/aler9/gortsplib v0.0.0-20201206210440-3631309f9fc3/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/aler9/gortsplib v0.0.0-20201208105438-07aefbcd5d11 h1:as97tV7XyNJurmD1e3iT0AcgxeIwRa+nwMm10gi0vO0=
github.com/aler9/gortsplib v0.0.0-20201208105438-07aefbcd5d11/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

84
internal/client/client.go

@ -19,6 +19,7 @@ import ( @@ -19,6 +19,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/serverudp"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -88,7 +89,7 @@ type Path interface { @@ -88,7 +89,7 @@ type Path interface {
// Parent is implemented by clientman.ClientMan.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
OnClientClose(*Client)
OnClientDescribe(*Client, string, *base.Request) (Path, error)
OnClientAnnounce(*Client, string, gortsplib.Tracks, *base.Request) (Path, error)
@ -162,7 +163,7 @@ func New( @@ -162,7 +163,7 @@ func New(
}
atomic.AddInt64(c.stats.CountClients, 1)
c.log("connected")
c.log(logger.Info, "connected")
c.wg.Add(1)
go c.run()
@ -178,8 +179,8 @@ func (c *Client) Close() { @@ -178,8 +179,8 @@ func (c *Client) Close() {
// IsSource implements path.source.
func (c *Client) IsSource() {}
func (c *Client) log(format string, args ...interface{}) {
c.parent.Log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
}
func (c *Client) ip() net.IP {
@ -198,7 +199,7 @@ var errStateInitial = errors.New("initial") @@ -198,7 +199,7 @@ var errStateInitial = errors.New("initial")
func (c *Client) run() {
defer c.wg.Done()
defer c.log("disconnected")
defer c.log(logger.Info, "disconnected")
if c.runOnConnect != "" {
onConnectCmd := externalcmd.New(c.runOnConnect, c.runOnConnectRestart, externalcmd.Environment{
@ -243,7 +244,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -243,7 +244,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
ip := c.ip()
if !ipEqualOrInRange(ip, ips) {
c.log("ERR: ip '%s' not allowed", ip)
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
return errAuthCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
@ -274,7 +275,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -274,7 +275,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
// 4) with password and username
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
c.log("ERR: unauthorized: %s", err)
c.log(logger.Info, "ERR: unauthorized: %s", err)
return errAuthCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
@ -286,7 +287,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -286,7 +287,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
}
if c.authFailures > 1 {
c.log("WARN: unauthorized: %s", err)
c.log(logger.Debug, "WARN: unauthorized: %s", err)
}
return errAuthNotCritical{&base.Response{
@ -318,10 +319,15 @@ func (c *Client) checkState(allowed map[state]struct{}) error { @@ -318,10 +319,15 @@ func (c *Client) checkState(allowed map[state]struct{}) error {
allowedList, c.state)
}
func (c *Client) writeRes(res *base.Response) {
c.log(logger.Debug, "s->c %v", res)
c.conn.WriteResponse(res)
}
func (c *Client) writeResError(cseq base.HeaderValue, code base.StatusCode, err error) {
c.log("ERR: %s", err)
c.log(logger.Info, "ERR: %s", err)
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: code,
Header: base.Header{
"CSeq": cseq,
@ -330,7 +336,7 @@ func (c *Client) writeResError(cseq base.HeaderValue, code base.StatusCode, err @@ -330,7 +336,7 @@ func (c *Client) writeResError(cseq base.HeaderValue, code base.StatusCode, err
}
func (c *Client) handleRequest(req *base.Request) error {
c.log(string(req.Method))
c.log(logger.Debug, "[c->s] %v", req)
cseq, ok := req.Header["CSeq"]
if !ok || len(cseq) != 1 {
@ -340,7 +346,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -340,7 +346,7 @@ func (c *Client) handleRequest(req *base.Request) error {
switch req.Method {
case base.Options:
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -360,7 +366,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -360,7 +366,7 @@ func (c *Client) handleRequest(req *base.Request) error {
// GET_PARAMETER is used like a ping
case base.GetParameter:
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -392,12 +398,12 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -392,12 +398,12 @@ func (c *Client) handleRequest(req *base.Request) error {
switch terr := err.(type) {
case errAuthNotCritical:
close(c.describeData)
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return nil
case errAuthCritical:
close(c.describeData)
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return errStateTerminate
default:
@ -462,11 +468,11 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -462,11 +468,11 @@ func (c *Client) handleRequest(req *base.Request) error {
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return nil
case errAuthCritical:
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return errStateTerminate
default:
@ -483,7 +489,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -483,7 +489,7 @@ func (c *Client) handleRequest(req *base.Request) error {
c.path = path
c.state = statePreRecord
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -560,11 +566,11 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -560,11 +566,11 @@ func (c *Client) handleRequest(req *base.Request) error {
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return nil
case errAuthCritical:
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return errStateTerminate
default:
@ -592,7 +598,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -592,7 +598,7 @@ func (c *Client) handleRequest(req *base.Request) error {
ServerPorts: &[2]int{c.serverUDPRtp.Port(), c.serverUDPRtcp.Port()},
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -618,11 +624,11 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -618,11 +624,11 @@ func (c *Client) handleRequest(req *base.Request) error {
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return nil
case errAuthCritical:
c.conn.WriteResponse(terr.Response)
c.writeRes(terr.Response)
return errStateTerminate
default:
@ -647,7 +653,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -647,7 +653,7 @@ func (c *Client) handleRequest(req *base.Request) error {
InterleavedIds: &interleavedIds,
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -709,7 +715,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -709,7 +715,7 @@ func (c *Client) handleRequest(req *base.Request) error {
ServerPorts: &[2]int{c.serverUDPRtp.Port(), c.serverUDPRtcp.Port()},
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -760,7 +766,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -760,7 +766,7 @@ func (c *Client) handleRequest(req *base.Request) error {
InterleavedIds: &interleavedIds,
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -810,7 +816,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -810,7 +816,7 @@ func (c *Client) handleRequest(req *base.Request) error {
// write response before setting state
// otherwise, in case of TCP connections, RTP packets could be sent
// before the response
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -851,7 +857,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -851,7 +857,7 @@ func (c *Client) handleRequest(req *base.Request) error {
return errStateTerminate
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -872,7 +878,7 @@ func (c *Client) handleRequest(req *base.Request) error { @@ -872,7 +878,7 @@ func (c *Client) handleRequest(req *base.Request) error {
return errStateTerminate
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": cseq,
@ -928,7 +934,7 @@ func (c *Client) runInitial() bool { @@ -928,7 +934,7 @@ func (c *Client) runInitial() bool {
default:
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
c.log(logger.Info, "ERR: %s", err)
}
c.parent.OnClientClose(c)
@ -959,7 +965,7 @@ func (c *Client) runWaitingDescribe() bool { @@ -959,7 +965,7 @@ func (c *Client) runWaitingDescribe() bool {
}
if res.redirect != "" {
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusMovedPermanently,
Header: base.Header{
"CSeq": c.describeCSeq,
@ -969,7 +975,7 @@ func (c *Client) runWaitingDescribe() bool { @@ -969,7 +975,7 @@ func (c *Client) runWaitingDescribe() bool {
return true
}
c.conn.WriteResponse(&base.Response{
c.writeRes(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"CSeq": c.describeCSeq,
@ -1006,7 +1012,7 @@ func (c *Client) runPlay() bool { @@ -1006,7 +1012,7 @@ func (c *Client) runPlay() bool {
c.state = statePlay
c.path.OnClientPlay(c)
c.log("is reading from path '%s', %d %s with %s", c.path.Name(), len(c.streamTracks), func() string {
c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(), len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
@ -1059,7 +1065,7 @@ func (c *Client) runPlayUDP() bool { @@ -1059,7 +1065,7 @@ func (c *Client) runPlayUDP() bool {
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
c.log(logger.Info, "ERR: %s", err)
}
c.path.OnClientRemove(c)
@ -1139,7 +1145,7 @@ func (c *Client) runPlayTCP() bool { @@ -1139,7 +1145,7 @@ func (c *Client) runPlayTCP() bool {
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
c.log(logger.Info, "ERR: %s", err)
}
c.path.OnClientRemove(c)
@ -1189,7 +1195,7 @@ func (c *Client) runRecord() bool { @@ -1189,7 +1195,7 @@ func (c *Client) runRecord() bool {
c.state = stateRecord
c.path.OnClientRecord(c)
c.log("is publishing to path '%s', %d %s with %s", c.path.Name(), len(c.streamTracks), func() string {
c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(), len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
@ -1285,7 +1291,7 @@ func (c *Client) runRecordUDP() bool { @@ -1285,7 +1291,7 @@ func (c *Client) runRecordUDP() bool {
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
c.log(logger.Info, "ERR: %s", err)
}
for _, track := range c.streamTracks {
@ -1328,7 +1334,7 @@ func (c *Client) runRecordUDP() bool { @@ -1328,7 +1334,7 @@ func (c *Client) runRecordUDP() bool {
}
}()
c.log("ERR: no packets received recently (maybe there's a firewall/NAT in between)")
c.log(logger.Info, "ERR: no packets received recently (maybe there's a firewall/NAT in between)")
c.conn.Close()
<-readerDone
@ -1426,7 +1432,7 @@ func (c *Client) runRecordTCP() bool { @@ -1426,7 +1432,7 @@ func (c *Client) runRecordTCP() bool {
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
c.log(logger.Info, "ERR: %s", err)
}
c.path.OnClientRemove(c)

7
internal/clientman/clientman.go

@ -8,6 +8,7 @@ import ( @@ -8,6 +8,7 @@ import (
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/servertcp"
"github.com/aler9/rtsp-simple-server/internal/serverudp"
@ -16,7 +17,7 @@ import ( @@ -16,7 +17,7 @@ import (
// Parent is implemented by program.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
}
// ClientManager is a client.Client manager.
@ -87,8 +88,8 @@ func (cm *ClientManager) Close() { @@ -87,8 +88,8 @@ func (cm *ClientManager) Close() {
}
// Log is the main logging function.
func (cm *ClientManager) Log(format string, args ...interface{}) {
cm.parent.Log(format, args...)
func (cm *ClientManager) Log(level logger.Level, format string, args ...interface{}) {
cm.parent.Log(level, format, args...)
}
func (cm *ClientManager) run() {

72
internal/conf/conf.go

@ -10,11 +10,16 @@ import ( @@ -10,11 +10,16 @@ import (
"gopkg.in/yaml.v2"
"github.com/aler9/rtsp-simple-server/internal/confenv"
"github.com/aler9/rtsp-simple-server/internal/loghandler"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
// Conf is the main program configuration.
type Conf struct {
LogLevel string `yaml:"logLevel"`
LogLevelParsed logger.Level `yaml:"-" json:"-"`
LogDestinations []string `yaml:"logDestinations"`
LogDestinationsParsed map[logger.Destination]struct{} `yaml:"-" json:"-"`
LogFile string `yaml:"logFile"`
Protocols []string `yaml:"protocols"`
ProtocolsParsed map[gortsplib.StreamProtocol]struct{} `yaml:"-" json:"-"`
RtspPort int `yaml:"rtspPort"`
@ -28,13 +33,49 @@ type Conf struct { @@ -28,13 +33,49 @@ type Conf struct {
AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"`
Metrics bool `yaml:"metrics"`
Pprof bool `yaml:"pprof"`
LogDestinations []string `yaml:"logDestinations"`
LogDestinationsParsed map[loghandler.Destination]struct{} `yaml:"-" json:"-"`
LogFile string `yaml:"logFile"`
Paths map[string]*PathConf `yaml:"paths"`
}
func (conf *Conf) fillAndCheck() error {
switch conf.LogLevel {
case "warn":
conf.LogLevelParsed = logger.Warn
case "", "info":
conf.LogLevelParsed = logger.Info
case "debug":
conf.LogLevelParsed = logger.Debug
default:
return fmt.Errorf("unsupported log level: %s", conf.LogLevel)
}
if len(conf.LogDestinations) == 0 {
conf.LogDestinations = []string{"stdout"}
}
conf.LogDestinationsParsed = make(map[logger.Destination]struct{})
for _, dest := range conf.LogDestinations {
switch dest {
case "stdout":
conf.LogDestinationsParsed[logger.DestinationStdout] = struct{}{}
case "file":
conf.LogDestinationsParsed[logger.DestinationFile] = struct{}{}
case "syslog":
conf.LogDestinationsParsed[logger.DestinationSyslog] = struct{}{}
default:
return fmt.Errorf("unsupported log destination: %s", dest)
}
}
if conf.LogFile == "" {
conf.LogFile = "rtsp-simple-server.log"
}
if len(conf.Protocols) == 0 {
conf.Protocols = []string{"udp", "tcp"}
}
@ -94,29 +135,6 @@ func (conf *Conf) fillAndCheck() error { @@ -94,29 +135,6 @@ func (conf *Conf) fillAndCheck() error {
}
}
if len(conf.LogDestinations) == 0 {
conf.LogDestinations = []string{"stdout"}
}
conf.LogDestinationsParsed = make(map[loghandler.Destination]struct{})
for _, dest := range conf.LogDestinations {
switch dest {
case "stdout":
conf.LogDestinationsParsed[loghandler.DestinationStdout] = struct{}{}
case "file":
conf.LogDestinationsParsed[loghandler.DestinationFile] = struct{}{}
case "syslog":
conf.LogDestinationsParsed[loghandler.DestinationSyslog] = struct{}{}
default:
return fmt.Errorf("unsupported log destination: %s", dest)
}
}
if conf.LogFile == "" {
conf.LogFile = "rtsp-simple-server.log"
}
if len(conf.Paths) == 0 {
conf.Paths = map[string]*PathConf{
"all": {},

160
internal/logger/logger.go

@ -0,0 +1,160 @@ @@ -0,0 +1,160 @@
package logger
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/aler9/rtsp-simple-server/internal/syslog"
)
// Level is a log level.
type Level int
const (
Debug Level = iota
Info
Warn
)
// Log levels.
// Destination is a log destination.
type Destination int
const (
// DestinationStdout writes logs to the standard output.
DestinationStdout Destination = iota
// DestinationFile writes logs to a file.
DestinationFile
// DestinationSyslog writes logs to the system logger.
DestinationSyslog
)
// Logger is a log handler.
type Logger struct {
level Level
destinations map[Destination]struct{}
mutex sync.Mutex
buffer []byte
file *os.File
syslog io.WriteCloser
}
// New allocates a log handler.
func New(level Level, destinations map[Destination]struct{}, filePath string) (*Logger, error) {
lh := &Logger{
level: level,
destinations: destinations,
}
if _, ok := destinations[DestinationFile]; ok {
var err error
lh.file, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
lh.Close()
return nil, err
}
}
if _, ok := destinations[DestinationSyslog]; ok {
var err error
lh.syslog, err = syslog.New("rtsp-simple-server")
if err != nil {
lh.Close()
return nil, err
}
}
return lh, nil
}
// Close closes a log handler.
func (lh *Logger) Close() {
if lh.file != nil {
lh.file.Close()
}
if lh.syslog != nil {
lh.syslog.Close()
}
}
// https://golang.org/src/log/log.go#L78
func itoa(buf *[]byte, i int, wid int) {
// Assemble decimal in reverse order.
var b [20]byte
bp := len(b) - 1
for i >= 10 || wid > 1 {
wid--
q := i / 10
b[bp] = byte('0' + i - q*10)
bp--
i = q
}
// i < 10
b[bp] = byte('0' + i)
*buf = append(*buf, b[bp:]...)
}
func (lh *Logger) Log(level Level, format string, args ...interface{}) {
if level < lh.level {
return
}
lh.mutex.Lock()
defer lh.mutex.Unlock()
lh.buffer = lh.buffer[:0]
// date
now := time.Now()
year, month, day := now.Date()
itoa(&lh.buffer, year, 4)
lh.buffer = append(lh.buffer, '/')
itoa(&lh.buffer, int(month), 2)
lh.buffer = append(lh.buffer, '/')
itoa(&lh.buffer, day, 2)
lh.buffer = append(lh.buffer, ' ')
// time
hour, min, sec := now.Clock()
itoa(&lh.buffer, hour, 2)
lh.buffer = append(lh.buffer, ':')
itoa(&lh.buffer, min, 2)
lh.buffer = append(lh.buffer, ':')
itoa(&lh.buffer, sec, 2)
lh.buffer = append(lh.buffer, ' ')
// level
switch level {
case Debug:
lh.buffer = append(lh.buffer, "[D] "...)
case Info:
lh.buffer = append(lh.buffer, "[I] "...)
case Warn:
lh.buffer = append(lh.buffer, "[W] "...)
}
// content
lh.buffer = append(lh.buffer, fmt.Sprintf(format, args...)...)
lh.buffer = append(lh.buffer, '\n')
// output
if _, ok := lh.destinations[DestinationStdout]; ok {
print(string(lh.buffer))
}
if _, ok := lh.destinations[DestinationFile]; ok {
lh.file.Write(lh.buffer)
}
if _, ok := lh.destinations[DestinationSyslog]; ok {
lh.syslog.Write(lh.buffer)
}
}

93
internal/loghandler/loghandler.go

@ -1,93 +0,0 @@ @@ -1,93 +0,0 @@
package loghandler
import (
"io"
"log"
"os"
"github.com/aler9/rtsp-simple-server/internal/syslog"
)
// Destination is a log destination.
type Destination int
const (
// DestinationStdout writes logs to the standard output.
DestinationStdout Destination = iota
// DestinationFile writes logs to a file.
DestinationFile
// DestinationSyslog writes logs to the system logger.
DestinationSyslog
)
type writeFunc func(p []byte) (int, error)
func (f writeFunc) Write(p []byte) (int, error) {
return f(p)
}
// LogHandler is a log handler.
type LogHandler struct {
destinations map[Destination]struct{}
file *os.File
syslog io.WriteCloser
}
// New allocates a log handler.
func New(destinations map[Destination]struct{}, filePath string) (*LogHandler, error) {
lh := &LogHandler{
destinations: destinations,
}
if _, ok := destinations[DestinationFile]; ok {
var err error
lh.file, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
lh.Close()
return nil, err
}
}
if _, ok := destinations[DestinationSyslog]; ok {
var err error
lh.syslog, err = syslog.New("rtsp-simple-server")
if err != nil {
lh.Close()
return nil, err
}
}
log.SetOutput(writeFunc(lh.write))
return lh, nil
}
// Close closes a log handler.
func (lh *LogHandler) Close() {
if lh.file != nil {
lh.file.Close()
}
if lh.syslog != nil {
lh.syslog.Close()
}
}
func (lh *LogHandler) write(p []byte) (int, error) {
if _, ok := lh.destinations[DestinationStdout]; ok {
print(string(p))
}
if _, ok := lh.destinations[DestinationFile]; ok {
lh.file.Write(p)
}
if _, ok := lh.destinations[DestinationSyslog]; ok {
lh.syslog.Write(p)
}
return len(p), nil
}

5
internal/metrics/metrics.go

@ -9,6 +9,7 @@ import ( @@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -18,7 +19,7 @@ const ( @@ -18,7 +19,7 @@ const (
// Parent is implemented by program.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
}
// Metrics is a metrics exporter.
@ -49,7 +50,7 @@ func New(stats *stats.Stats, parent Parent) (*Metrics, error) { @@ -49,7 +50,7 @@ func New(stats *stats.Stats, parent Parent) (*Metrics, error) {
Handler: m.mux,
}
parent.Log("[metrics] opened on " + address)
parent.Log(logger.Info, "[metrics] opened on "+address)
go m.run()
return m, nil

17
internal/path/path.go

@ -14,6 +14,7 @@ import ( @@ -14,6 +14,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/sourcertmp"
"github.com/aler9/rtsp-simple-server/internal/sourcertsp"
"github.com/aler9/rtsp-simple-server/internal/stats"
@ -27,7 +28,7 @@ func newEmptyTimer() *time.Timer { @@ -27,7 +28,7 @@ func newEmptyTimer() *time.Timer {
// Parent is implemented by pathman.PathMan.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
OnPathClose(*Path)
OnPathClientClose(*client.Client)
}
@ -230,8 +231,8 @@ func (pa *Path) Close() { @@ -230,8 +231,8 @@ func (pa *Path) Close() {
}
// Log is the main logging function.
func (pa *Path) Log(format string, args ...interface{}) {
pa.parent.Log("[path "+pa.name+"] "+format, args...)
func (pa *Path) Log(level logger.Level, format string, args ...interface{}) {
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...)
}
func (pa *Path) run() {
@ -246,7 +247,7 @@ func (pa *Path) run() { @@ -246,7 +247,7 @@ func (pa *Path) run() {
var onInitCmd *externalcmd.Cmd
if pa.conf.RunOnInit != "" {
pa.Log("on init command started")
pa.Log(logger.Info, "on init command started")
onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, externalcmd.Environment{
Path: pa.name,
Port: strconv.FormatInt(int64(pa.rtspPort), 10),
@ -280,7 +281,7 @@ outer: @@ -280,7 +281,7 @@ outer:
case <-pa.runOnDemandCloseTimer.C:
pa.runOnDemandCloseTimerStarted = false
pa.Log("on demand command stopped")
pa.Log(logger.Info, "on demand command stopped")
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
@ -364,7 +365,7 @@ outer: @@ -364,7 +365,7 @@ outer:
pa.closeTimer.Stop()
if onInitCmd != nil {
pa.Log("on init command stopped")
pa.Log(logger.Info, "on init command stopped")
onInitCmd.Close()
}
@ -374,7 +375,7 @@ outer: @@ -374,7 +375,7 @@ outer:
pa.sourceWg.Wait()
if pa.onDemandCmd != nil {
pa.Log("on demand command stopped")
pa.Log(logger.Info, "on demand command stopped")
pa.onDemandCmd.Close()
}
@ -609,7 +610,7 @@ func (pa *Path) onClientDescribe(c *client.Client) { @@ -609,7 +610,7 @@ func (pa *Path) onClientDescribe(c *client.Client) {
// start on-demand command
if pa.conf.RunOnDemand != "" {
if pa.onDemandCmd == nil {
pa.Log("on demand command started")
pa.Log(logger.Info, "on demand command started")
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
Path: pa.name,
Port: strconv.FormatInt(int64(pa.rtspPort), 10),

7
internal/pathman/pathman.go

@ -11,13 +11,14 @@ import ( @@ -11,13 +11,14 @@ import (
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/path"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
// Parent is implemented by program.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
}
// PathManager is a path.Path manager.
@ -92,8 +93,8 @@ func (pm *PathManager) Close() { @@ -92,8 +93,8 @@ func (pm *PathManager) Close() {
}
// Log is the main logging function.
func (pm *PathManager) Log(format string, args ...interface{}) {
pm.parent.Log(format, args...)
func (pm *PathManager) Log(level logger.Level, format string, args ...interface{}) {
pm.parent.Log(level, format, args...)
}
func (pm *PathManager) run() {

6
internal/pprof/pprof.go

@ -7,6 +7,8 @@ import ( @@ -7,6 +7,8 @@ import (
// start pprof
_ "net/http/pprof"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
const (
@ -15,7 +17,7 @@ const ( @@ -15,7 +17,7 @@ const (
// Parent is implemented by program.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
}
// Pprof is a performance metrics exporter.
@ -39,7 +41,7 @@ func New(parent Parent) (*Pprof, error) { @@ -39,7 +41,7 @@ func New(parent Parent) (*Pprof, error) {
Handler: http.DefaultServeMux,
}
parent.Log("[pprof] opened on " + address)
parent.Log(logger.Info, "[pprof] opened on "+address)
go pp.run()
return pp, nil

6
internal/servertcp/server.go

@ -5,11 +5,13 @@ import ( @@ -5,11 +5,13 @@ import (
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
// Parent is implemented by program.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
}
// Server is a RTSP TCP server.
@ -47,7 +49,7 @@ func New(port int, @@ -47,7 +49,7 @@ func New(port int,
done: make(chan struct{}),
}
parent.Log("[TCP server] opened on :%d", port)
parent.Log(logger.Info, "[TCP server] opened on :%d", port)
go s.run()
return s, nil

5
internal/serverudp/server.go

@ -8,6 +8,7 @@ import ( @@ -8,6 +8,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/multibuffer"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
const (
@ -29,7 +30,7 @@ type publisherData struct { @@ -29,7 +30,7 @@ type publisherData struct {
// Parent is implemented by program.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
}
type publisherAddr struct {
@ -96,7 +97,7 @@ func New(writeTimeout time.Duration, @@ -96,7 +97,7 @@ func New(writeTimeout time.Duration,
} else {
label = "RTCP"
}
parent.Log("[UDP/"+label+" server] opened on :%d", port)
parent.Log(logger.Info, "[UDP/"+label+" server] opened on :%d", port)
go s.run()
return s, nil

31
internal/sourcertmp/source.go

@ -15,6 +15,7 @@ import ( @@ -15,6 +15,7 @@ import (
"github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -25,7 +26,7 @@ const ( @@ -25,7 +26,7 @@ const (
// Parent is implemeneted by path.Path.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
OnSourceSetReady(gortsplib.Tracks)
OnSourceSetNotReady()
OnFrame(int, gortsplib.StreamType, []byte)
@ -56,7 +57,7 @@ func New(ur string, @@ -56,7 +57,7 @@ func New(ur string,
}
atomic.AddInt64(s.stats.CountSourcesRtmp, +1)
s.parent.Log("rtmp source started")
s.log(logger.Info, "started")
s.wg.Add(1)
go s.run()
@ -66,7 +67,7 @@ func New(ur string, @@ -66,7 +67,7 @@ func New(ur string,
// Close closes a Source.
func (s *Source) Close() {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1)
s.parent.Log("rtmp source stopped")
s.log(logger.Info, "stopped")
close(s.terminate)
}
@ -76,6 +77,10 @@ func (s *Source) IsSource() {} @@ -76,6 +77,10 @@ func (s *Source) IsSource() {}
// IsSourceExternal implements path.sourceExternal.
func (s *Source) IsSourceExternal() {}
func (s *Source) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtmp source] "+format, args...)
}
func (s *Source) run() {
defer s.wg.Done()
@ -103,7 +108,7 @@ func (s *Source) run() { @@ -103,7 +108,7 @@ func (s *Source) run() {
}
func (s *Source) runInner() bool {
s.parent.Log("connecting to rtmp source")
s.log(logger.Info, "connecting")
var conn *rtmp.Conn
var nconn net.Conn
@ -121,7 +126,7 @@ func (s *Source) runInner() bool { @@ -121,7 +126,7 @@ func (s *Source) runInner() bool {
}
if err != nil {
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
@ -179,7 +184,7 @@ func (s *Source) runInner() bool { @@ -179,7 +184,7 @@ func (s *Source) runInner() bool {
}
if err != nil {
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
@ -196,7 +201,7 @@ func (s *Source) runInner() bool { @@ -196,7 +201,7 @@ func (s *Source) runInner() bool {
if h264Sps != nil {
videoTrack, err = gortsplib.NewTrackH264(96, h264Sps, h264Pps)
if err != nil {
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
@ -205,7 +210,7 @@ func (s *Source) runInner() bool { @@ -205,7 +210,7 @@ func (s *Source) runInner() bool {
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
@ -215,7 +220,7 @@ func (s *Source) runInner() bool { @@ -215,7 +220,7 @@ func (s *Source) runInner() bool {
if aacConfig != nil {
audioTrack, err = gortsplib.NewTrackAAC(96, aacConfig)
if err != nil {
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
@ -224,7 +229,7 @@ func (s *Source) runInner() bool { @@ -224,7 +229,7 @@ func (s *Source) runInner() bool {
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
@ -232,7 +237,7 @@ func (s *Source) runInner() bool { @@ -232,7 +237,7 @@ func (s *Source) runInner() bool {
}
if len(tracks) == 0 {
s.parent.Log("rtmp source ERR: no tracks found")
s.log(logger.Info, "ERR: no tracks found")
return true
}
@ -240,7 +245,7 @@ func (s *Source) runInner() bool { @@ -240,7 +245,7 @@ func (s *Source) runInner() bool {
t.ID = i
}
s.parent.Log("rtmp source ready")
s.log(logger.Info, "ready")
s.parent.OnSourceSetReady(tracks)
defer s.parent.OnSourceSetNotReady()
@ -348,7 +353,7 @@ func (s *Source) runInner() bool { @@ -348,7 +353,7 @@ func (s *Source) runInner() bool {
case err := <-readerDone:
nconn.Close()
s.parent.Log("rtmp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
close(rtcpTerminate)
<-rtcpDone

30
internal/sourcertsp/source.go

@ -6,7 +6,9 @@ import ( @@ -6,7 +6,9 @@ import (
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -16,7 +18,7 @@ const ( @@ -16,7 +18,7 @@ const (
// Parent is implemented by path.Path.
type Parent interface {
Log(string, ...interface{})
Log(logger.Level, string, ...interface{})
OnSourceSetReady(gortsplib.Tracks)
OnSourceSetNotReady()
OnFrame(int, gortsplib.StreamType, []byte)
@ -56,7 +58,7 @@ func New(ur string, @@ -56,7 +58,7 @@ func New(ur string,
}
atomic.AddInt64(s.stats.CountSourcesRtsp, +1)
s.parent.Log("rtsp source started")
s.log(logger.Info, "started")
s.wg.Add(1)
go s.run()
@ -66,7 +68,7 @@ func New(ur string, @@ -66,7 +68,7 @@ func New(ur string,
// Close closes a Source.
func (s *Source) Close() {
atomic.AddInt64(s.stats.CountSourcesRtsp, -1)
s.parent.Log("rtsp source stopped")
s.log(logger.Info, "stopped")
close(s.terminate)
}
@ -76,6 +78,10 @@ func (s *Source) IsSource() {} @@ -76,6 +78,10 @@ func (s *Source) IsSource() {}
// IsSourceExternal implements path.sourceExternal.
func (s *Source) IsSourceExternal() {}
func (s *Source) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtsp source] "+format, args...)
}
func (s *Source) run() {
defer s.wg.Done()
@ -103,7 +109,7 @@ func (s *Source) run() { @@ -103,7 +109,7 @@ func (s *Source) run() {
}
func (s *Source) runInner() bool {
s.parent.Log("connecting to rtsp source")
s.log(logger.Info, "connecting")
var conn *gortsplib.ClientConn
var err error
@ -111,13 +117,19 @@ func (s *Source) runInner() bool { @@ -111,13 +117,19 @@ func (s *Source) runInner() bool {
go func() {
defer close(dialDone)
dialer := gortsplib.ClientConf{
conf := gortsplib.ClientConf{
StreamProtocol: s.proto,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
ReadBufferCount: 2,
OnRequest: func(req *base.Request) {
s.log(logger.Debug, "c->s %v", req)
},
OnResponse: func(res *base.Response) {
s.log(logger.Debug, "s->c %v", res)
},
}
conn, err = dialer.DialRead(s.ur)
conn, err = conf.DialRead(s.ur)
}()
select {
@ -127,13 +139,13 @@ func (s *Source) runInner() bool { @@ -127,13 +139,13 @@ func (s *Source) runInner() bool {
}
if err != nil {
s.parent.Log("rtsp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
tracks := conn.Tracks()
s.parent.Log("rtsp source ready")
s.log(logger.Info, "ready")
s.parent.OnSourceSetReady(tracks)
defer s.parent.OnSourceSetNotReady()
@ -150,7 +162,7 @@ func (s *Source) runInner() bool { @@ -150,7 +162,7 @@ func (s *Source) runInner() bool {
case err := <-readerDone:
conn.Close()
s.parent.Log("rtsp source ERR: %s", err)
s.log(logger.Info, "ERR: %s", err)
return true
}
}

55
main.go

@ -2,7 +2,6 @@ package main @@ -2,7 +2,6 @@ package main
import (
"fmt"
"log"
"os"
"reflect"
"sync/atomic"
@ -13,7 +12,7 @@ import ( @@ -13,7 +12,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/clientman"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/confwatcher"
"github.com/aler9/rtsp-simple-server/internal/loghandler"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/metrics"
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/pprof"
@ -29,7 +28,7 @@ type program struct { @@ -29,7 +28,7 @@ type program struct {
conf *conf.Conf
confFound bool
stats *stats.Stats
logHandler *loghandler.LogHandler
logger *logger.Logger
metrics *metrics.Metrics
pprof *pprof.Pprof
serverUDPRtp *serverudp.Server
@ -43,7 +42,7 @@ type program struct { @@ -43,7 +42,7 @@ type program struct {
done chan struct{}
}
func newProgram(args []string) (*program, error) {
func newProgram(args []string) (*program, bool) {
k := kingpin.New("rtsp-simple-server",
"rtsp-simple-server "+version+"\n\nRTSP server.")
@ -66,26 +65,29 @@ func newProgram(args []string) (*program, error) { @@ -66,26 +65,29 @@ func newProgram(args []string) (*program, error) {
var err error
p.conf, p.confFound, err = conf.Load(p.confPath)
if err != nil {
return nil, err
p.Log(logger.Info, "ERR: %s", err)
return nil, false
}
err = p.createDynamicResources(true)
if err != nil {
p.Log(logger.Info, "ERR: %s", err)
p.closeAllResources()
return nil, err
return nil, false
}
if p.confFound {
p.confWatcher, err = confwatcher.New(p.confPath)
if err != nil {
p.Log(logger.Info, "ERR: %s", err)
p.closeAllResources()
return nil, err
return nil, false
}
}
go p.run()
return p, nil
return p, true
}
func (p *program) close() {
@ -93,12 +95,12 @@ func (p *program) close() { @@ -93,12 +95,12 @@ func (p *program) close() {
<-p.done
}
func (p *program) Log(format string, args ...interface{}) {
func (p *program) Log(level logger.Level, format string, args ...interface{}) {
countClients := atomic.LoadInt64(p.stats.CountClients)
countPublishers := atomic.LoadInt64(p.stats.CountPublishers)
countReaders := atomic.LoadInt64(p.stats.CountReaders)
log.Printf("[%d/%d/%d] "+format, append([]interface{}{countClients,
p.logger.Log(level, "[%d/%d/%d] "+format, append([]interface{}{countClients,
countPublishers, countReaders}, args...)...)
}
@ -118,7 +120,7 @@ outer: @@ -118,7 +120,7 @@ outer:
case <-confChanged:
err := p.reloadConf()
if err != nil {
p.Log("ERR: %s", err)
p.Log(logger.Info, "ERR: %s", err)
break outer
}
@ -137,18 +139,19 @@ func (p *program) createDynamicResources(initial bool) error { @@ -137,18 +139,19 @@ func (p *program) createDynamicResources(initial bool) error {
p.stats = stats.New()
}
if p.logHandler == nil {
p.logHandler, err = loghandler.New(p.conf.LogDestinationsParsed, p.conf.LogFile)
if p.logger == nil {
p.logger, err = logger.New(p.conf.LogLevelParsed, p.conf.LogDestinationsParsed,
p.conf.LogFile)
if err != nil {
return err
}
}
if initial {
p.Log("rtsp-simple-server %s", version)
p.Log(logger.Info, "rtsp-simple-server %s", version)
if !p.confFound {
p.Log("configuration file not found, using the default one")
p.Log(logger.Warn, "configuration file not found, using the default one")
}
}
@ -245,23 +248,23 @@ func (p *program) closeAllResources() { @@ -245,23 +248,23 @@ func (p *program) closeAllResources() {
p.pprof.Close()
}
if p.logHandler != nil {
p.logHandler.Close()
if p.logger != nil {
p.logger.Close()
}
}
func (p *program) reloadConf() error {
p.Log("reloading configuration")
p.Log(logger.Info, "reloading configuration")
conf, _, err := conf.Load(p.confPath)
if err != nil {
return err
}
closeLogHandler := false
closeLogger := false
if !reflect.DeepEqual(conf.LogDestinationsParsed, p.conf.LogDestinationsParsed) ||
conf.LogFile != p.conf.LogFile {
closeLogHandler = true
closeLogger = true
}
closeMetrics := false
@ -353,9 +356,9 @@ func (p *program) reloadConf() error { @@ -353,9 +356,9 @@ func (p *program) reloadConf() error {
p.metrics = nil
}
if closeLogHandler {
p.logHandler.Close()
p.logHandler = nil
if closeLogger {
p.logger.Close()
p.logger = nil
}
p.conf = conf
@ -363,9 +366,9 @@ func (p *program) reloadConf() error { @@ -363,9 +366,9 @@ func (p *program) reloadConf() error {
}
func main() {
p, err := newProgram(os.Args[1:])
if err != nil {
log.Fatal("ERR: ", err)
p, ok := newProgram(os.Args[1:])
if !ok {
os.Exit(1)
}
<-p.done

72
main_test.go

@ -102,14 +102,14 @@ func (c *container) ip() string { @@ -102,14 +102,14 @@ func (c *container) ip() string {
return string(out[:len(out)-1])
}
func testProgram(conf string) (*program, error) {
func testProgram(conf string) (*program, bool) {
if conf == "" {
return newProgram([]string{})
}
tmpf, err := ioutil.TempFile(os.TempDir(), "rtsp-")
if err != nil {
return nil, err
return nil, false
}
defer os.Remove(tmpf.Name())
@ -158,8 +158,8 @@ func TestEnvironment(t *testing.T) { @@ -158,8 +158,8 @@ func TestEnvironment(t *testing.T) {
os.Setenv("RTSP_PATHS_CAM1_SOURCEONDEMAND", "yes")
defer os.Unsetenv("RTSP_PATHS_CAM1_SOURCEONDEMAND")
p, err := testProgram("")
require.NoError(t, err)
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
require.Equal(t, "test=cmd", p.conf.RunOnConnect)
@ -217,8 +217,8 @@ func TestEnvironmentNoFile(t *testing.T) { @@ -217,8 +217,8 @@ func TestEnvironmentNoFile(t *testing.T) {
os.Setenv("RTSP_PATHS_CAM1_SOURCE", "rtsp://testing")
defer os.Unsetenv("RTSP_PATHS_CAM1_SOURCE")
p, err := testProgram("{}")
require.NoError(t, err)
p, ok := testProgram("{}")
require.Equal(t, true, ok)
defer p.close()
pa, ok := p.conf.Paths["cam1"]
@ -244,8 +244,8 @@ func TestPublish(t *testing.T) { @@ -244,8 +244,8 @@ func TestPublish(t *testing.T) {
{"gstreamer", "tcp"},
} {
t.Run(conf.publishSoft+"_"+conf.publishProto, func(t *testing.T) {
p, err := testProgram("")
require.NoError(t, err)
p, ok := testProgram("readTimeout: 20s")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -303,8 +303,8 @@ func TestRead(t *testing.T) { @@ -303,8 +303,8 @@ func TestRead(t *testing.T) {
{"vlc", "tcp"},
} {
t.Run(conf.readSoft+"_"+conf.readProto, func(t *testing.T) {
p, err := testProgram("")
require.NoError(t, err)
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -355,8 +355,8 @@ func TestRead(t *testing.T) { @@ -355,8 +355,8 @@ func TestRead(t *testing.T) {
}
func TestTCPOnly(t *testing.T) {
p, err := testProgram("protocols: [tcp]\n")
require.NoError(t, err)
p, ok := testProgram("protocols: [tcp]\n")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -387,8 +387,8 @@ func TestTCPOnly(t *testing.T) { @@ -387,8 +387,8 @@ func TestTCPOnly(t *testing.T) {
}
func TestPathWithSlash(t *testing.T) {
p, err := testProgram("")
require.NoError(t, err)
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -419,8 +419,8 @@ func TestPathWithSlash(t *testing.T) { @@ -419,8 +419,8 @@ func TestPathWithSlash(t *testing.T) {
}
func TestPathWithQuery(t *testing.T) {
p, err := testProgram("")
require.NoError(t, err)
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -452,12 +452,12 @@ func TestPathWithQuery(t *testing.T) { @@ -452,12 +452,12 @@ func TestPathWithQuery(t *testing.T) {
func TestAuth(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, err := testProgram("paths:\n" +
p, ok := testProgram("paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
" publishPass: test!$()*+.;<=>[]^_-{}\n" +
" publishIps: [172.17.0.0/16]\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -494,12 +494,12 @@ func TestAuth(t *testing.T) { @@ -494,12 +494,12 @@ func TestAuth(t *testing.T) {
"vlc",
} {
t.Run("read_"+soft, func(t *testing.T) {
p, err := testProgram("paths:\n" +
p, ok := testProgram("paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: test!$()*+.;<=>[]^_-{}\n" +
" readIps: [172.17.0.0/16]\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -545,10 +545,10 @@ func TestAuth(t *testing.T) { @@ -545,10 +545,10 @@ func TestAuth(t *testing.T) {
}
func TestAuthIpFail(t *testing.T) {
p, err := testProgram("paths:\n" +
p, ok := testProgram("paths:\n" +
" all:\n" +
" publishIps: [127.0.0.1/32]\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -574,11 +574,11 @@ func TestSourceRtsp(t *testing.T) { @@ -574,11 +574,11 @@ func TestSourceRtsp(t *testing.T) {
"tcp",
} {
t.Run(proto, func(t *testing.T) {
p1, err := testProgram("paths:\n" +
p1, ok := testProgram("paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p1.close()
time.Sleep(1 * time.Second)
@ -597,7 +597,7 @@ func TestSourceRtsp(t *testing.T) { @@ -597,7 +597,7 @@ func TestSourceRtsp(t *testing.T) {
time.Sleep(1 * time.Second)
p2, err := testProgram("rtspPort: 8555\n" +
p2, ok := testProgram("rtspPort: 8555\n" +
"rtpPort: 8100\n" +
"rtcpPort: 8101\n" +
"\n" +
@ -606,7 +606,7 @@ func TestSourceRtsp(t *testing.T) { @@ -606,7 +606,7 @@ func TestSourceRtsp(t *testing.T) {
" source: rtsp://testuser:testpass@localhost:8554/teststream\n" +
" sourceProtocol: " + proto + "\n" +
" sourceOnDemand: yes\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p2.close()
time.Sleep(1 * time.Second)
@ -646,11 +646,11 @@ func TestSourceRtmp(t *testing.T) { @@ -646,11 +646,11 @@ func TestSourceRtmp(t *testing.T) {
time.Sleep(1 * time.Second)
p, err := testProgram("paths:\n" +
p, ok := testProgram("paths:\n" +
" proxied:\n" +
" source: rtmp://" + cnt1.ip() + "/stream/test\n" +
" sourceOnDemand: yes\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
@ -669,12 +669,12 @@ func TestSourceRtmp(t *testing.T) { @@ -669,12 +669,12 @@ func TestSourceRtmp(t *testing.T) {
}
func TestRedirect(t *testing.T) {
p1, err := testProgram("paths:\n" +
p1, ok := testProgram("paths:\n" +
" path1:\n" +
" source: redirect\n" +
" sourceRedirect: rtsp://" + ownDockerIP + ":8554/path2\n" +
" path2:\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p1.close()
time.Sleep(1 * time.Second)
@ -707,11 +707,11 @@ func TestRedirect(t *testing.T) { @@ -707,11 +707,11 @@ func TestRedirect(t *testing.T) {
}
func TestFallback(t *testing.T) {
p1, err := testProgram("paths:\n" +
p1, ok := testProgram("paths:\n" +
" path1:\n" +
" fallback: rtsp://" + ownDockerIP + ":8554/path2\n" +
" path2:\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p1.close()
time.Sleep(1 * time.Second)
@ -744,10 +744,10 @@ func TestFallback(t *testing.T) { @@ -744,10 +744,10 @@ func TestFallback(t *testing.T) {
}
func TestRunOnDemand(t *testing.T) {
p1, err := testProgram("paths:\n" +
p1, ok := testProgram("paths:\n" +
" all:\n" +
" runOnDemand: ffmpeg -hide_banner -loglevel error -re -i testimages/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:$RTSP_PORT/$RTSP_PATH\n")
require.NoError(t, err)
require.Equal(t, true, ok)
defer p1.close()
time.Sleep(1 * time.Second)
@ -778,8 +778,8 @@ func TestHotReloading(t *testing.T) { @@ -778,8 +778,8 @@ func TestHotReloading(t *testing.T) {
require.NoError(t, err)
defer os.Remove(confPath)
p, err := newProgram([]string{confPath})
require.NoError(t, err)
p, ok := newProgram([]string{confPath})
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)

4
rtsp-simple-server.yml

@ -1,4 +1,6 @@ @@ -1,4 +1,6 @@
# sets the verbosity of the program; available values are "warn", "info", "debug"
logLevel: info
# destinations of log messages; available values are "stdout", "file" and "syslog".
logDestinations: [stdout]
# if "file" is in logDestinations, this is the file which will receive the logs.
@ -49,7 +51,9 @@ paths: @@ -49,7 +51,9 @@ paths:
# if the source is an RTSP url, this is the protocol that will be used to
# pull the stream. available options are "automatic", "udp", "tcp".
# the tcp protocol can help to overcome the error "no packets received recently".
sourceProtocol: automatic
# if the source is an RTSP or RTMP url, it will be pulled only when at least
# one reader is connected, saving bandwidth.
sourceOnDemand: no

Loading…
Cancel
Save