Browse Source

new parameter readBufferCount

pull/181/head v0.13.2
aler9 5 years ago
parent
commit
97305af272
  1. 10
      internal/clientman/clientman.go
  2. 4
      internal/conf/conf.go
  3. 10
      internal/confenv/confenv.go
  4. 38
      internal/path/path.go
  5. 56
      internal/pathman/pathman.go
  6. 2
      internal/serverplain/server.go
  7. 8
      internal/servertls/server.go
  8. 35
      internal/sourcertsp/source.go
  9. 22
      main.go
  10. 4
      rtsp-simple-server.yml

10
internal/clientman/clientman.go

@ -29,7 +29,7 @@ type ClientManager struct {
protocols map[base.StreamProtocol]struct{} protocols map[base.StreamProtocol]struct{}
stats *stats.Stats stats *stats.Stats
pathMan *pathman.PathManager pathMan *pathman.PathManager
serverTCP *serverplain.Server serverPlain *serverplain.Server
serverTLS *servertls.Server serverTLS *servertls.Server
parent Parent parent Parent
@ -53,7 +53,7 @@ func New(
protocols map[base.StreamProtocol]struct{}, protocols map[base.StreamProtocol]struct{},
stats *stats.Stats, stats *stats.Stats,
pathMan *pathman.PathManager, pathMan *pathman.PathManager,
serverTCP *serverplain.Server, serverPlain *serverplain.Server,
serverTLS *servertls.Server, serverTLS *servertls.Server,
parent Parent) *ClientManager { parent Parent) *ClientManager {
@ -65,7 +65,7 @@ func New(
protocols: protocols, protocols: protocols,
stats: stats, stats: stats,
pathMan: pathMan, pathMan: pathMan,
serverTCP: serverTCP, serverPlain: serverPlain,
serverTLS: serverTLS, serverTLS: serverTLS,
parent: parent, parent: parent,
clients: make(map[*client.Client]struct{}), clients: make(map[*client.Client]struct{}),
@ -93,8 +93,8 @@ func (cm *ClientManager) run() {
defer close(cm.done) defer close(cm.done)
tcpAccept := func() chan *gortsplib.ServerConn { tcpAccept := func() chan *gortsplib.ServerConn {
if cm.serverTCP != nil { if cm.serverPlain != nil {
return cm.serverTCP.Accept() return cm.serverPlain.Accept()
} }
return make(chan *gortsplib.ServerConn) return make(chan *gortsplib.ServerConn)
}() }()

4
internal/conf/conf.go

@ -44,6 +44,7 @@ type Conf struct {
AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"` AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"`
ReadTimeout time.Duration `yaml:"readTimeout"` ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"`
ReadBufferCount uint64 `yaml:"readBufferCount"`
Metrics bool `yaml:"metrics"` Metrics bool `yaml:"metrics"`
Pprof bool `yaml:"pprof"` Pprof bool `yaml:"pprof"`
RunOnConnect string `yaml:"runOnConnect"` RunOnConnect string `yaml:"runOnConnect"`
@ -182,6 +183,9 @@ func (conf *Conf) fillAndCheck() error {
if conf.WriteTimeout == 0 { if conf.WriteTimeout == 0 {
conf.WriteTimeout = 10 * time.Second conf.WriteTimeout = 10 * time.Second
} }
if conf.ReadBufferCount == 0 {
conf.ReadBufferCount = 512
}
if len(conf.Paths) == 0 { if len(conf.Paths) == 0 {
conf.Paths = map[string]*PathConf{ conf.Paths = map[string]*PathConf{

10
internal/confenv/confenv.go vendored

@ -41,6 +41,16 @@ func load(env map[string]string, envKey string, rv reflect.Value) error {
} }
return nil return nil
case reflect.Uint64:
if ev, ok := env[envKey]; ok {
iv, err := strconv.ParseUint(ev, 10, 64)
if err != nil {
return fmt.Errorf("%s: %s", envKey, err)
}
rv.SetUint(iv)
}
return nil
case reflect.Bool: case reflect.Bool:
if ev, ok := env[envKey]; ok { if ev, ok := env[envKey]; ok {
switch strings.ToLower(ev) { switch strings.ToLower(ev) {

38
internal/path/path.go

@ -140,15 +140,16 @@ const (
// Path is a path. // Path is a path.
type Path struct { type Path struct {
rtspPort int rtspPort int
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
confName string readBufferCount uint64
conf *conf.PathConf confName string
name string conf *conf.PathConf
wg *sync.WaitGroup name string
stats *stats.Stats wg *sync.WaitGroup
parent Parent stats *stats.Stats
parent Parent
clients map[*client.Client]clientState clients map[*client.Client]clientState
clientsWg sync.WaitGroup clientsWg sync.WaitGroup
@ -185,6 +186,7 @@ func New(
rtspPort int, rtspPort int,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
readBufferCount uint64,
confName string, confName string,
conf *conf.PathConf, conf *conf.PathConf,
name string, name string,
@ -196,6 +198,7 @@ func New(
rtspPort: rtspPort, rtspPort: rtspPort,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
confName: confName, confName: confName,
conf: conf, conf: conf,
name: name, name: name,
@ -482,11 +485,22 @@ func (pa *Path) hasExternalSource() bool {
func (pa *Path) startExternalSource() { func (pa *Path) startExternalSource() {
if strings.HasPrefix(pa.conf.Source, "rtsp://") || if strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") { strings.HasPrefix(pa.conf.Source, "rtsps://") {
pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed, pa.source = sourcertsp.New(
pa.readTimeout, pa.writeTimeout, &pa.sourceWg, pa.stats, pa) pa.conf.Source,
pa.conf.SourceProtocolParsed,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
&pa.sourceWg,
pa.stats,
pa)
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") { } else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
pa.source = sourcertmp.New(pa.conf.Source, &pa.sourceWg, pa.stats, pa) pa.source = sourcertmp.New(
pa.conf.Source,
&pa.sourceWg,
pa.stats,
pa)
} }
} }

56
internal/pathman/pathman.go

@ -23,13 +23,14 @@ type Parent interface {
// PathManager is a path.Path manager. // PathManager is a path.Path manager.
type PathManager struct { type PathManager struct {
rtspPort int rtspPort int
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
authMethods []headers.AuthMethod readBufferCount uint64
pathConfs map[string]*conf.PathConf authMethods []headers.AuthMethod
stats *stats.Stats pathConfs map[string]*conf.PathConf
parent Parent stats *stats.Stats
parent Parent
paths map[string]*path.Path paths map[string]*path.Path
wg sync.WaitGroup wg sync.WaitGroup
@ -52,6 +53,7 @@ func New(
rtspPort int, rtspPort int,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
readBufferCount uint64,
authMethods []headers.AuthMethod, authMethods []headers.AuthMethod,
pathConfs map[string]*conf.PathConf, pathConfs map[string]*conf.PathConf,
stats *stats.Stats, stats *stats.Stats,
@ -61,6 +63,7 @@ func New(
rtspPort: rtspPort, rtspPort: rtspPort,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
authMethods: authMethods, authMethods: authMethods,
pathConfs: pathConfs, pathConfs: pathConfs,
stats: stats, stats: stats,
@ -160,8 +163,17 @@ outer:
// create path if it doesn't exist // create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok { if _, ok := pm.paths[req.PathName]; !ok {
pa := path.New(pm.rtspPort, pm.readTimeout, pm.writeTimeout, pa := path.New(
pathName, pathConf, req.PathName, &pm.wg, pm.stats, pm) pm.rtspPort,
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pathName,
pathConf,
req.PathName,
&pm.wg,
pm.stats,
pm)
pm.paths[req.PathName] = pa pm.paths[req.PathName] = pa
} }
@ -183,8 +195,17 @@ outer:
// create path if it doesn't exist // create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok { if _, ok := pm.paths[req.PathName]; !ok {
pa := path.New(pm.rtspPort, pm.readTimeout, pm.writeTimeout, pa := path.New(
pathName, pathConf, req.PathName, &pm.wg, pm.stats, pm) pm.rtspPort,
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pathName,
pathConf,
req.PathName,
&pm.wg,
pm.stats,
pm)
pm.paths[req.PathName] = pa pm.paths[req.PathName] = pa
} }
@ -257,8 +278,17 @@ outer:
func (pm *PathManager) createPaths() { func (pm *PathManager) createPaths() {
for pathName, pathConf := range pm.pathConfs { for pathName, pathConf := range pm.pathConfs {
if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil { if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil {
pa := path.New(pm.rtspPort, pm.readTimeout, pm.writeTimeout, pa := path.New(
pathName, pathConf, pathName, &pm.wg, pm.stats, pm) pm.rtspPort,
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pathName,
pathConf,
pathName,
&pm.wg,
pm.stats,
pm)
pm.paths[pathName] = pa pm.paths[pathName] = pa
} }
} }

2
internal/serverplain/server.go

@ -29,6 +29,7 @@ type Server struct {
func New(port int, func New(port int,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
readBufferCount uint64,
udpRTPListener *gortsplib.ServerUDPListener, udpRTPListener *gortsplib.ServerUDPListener,
udpRTCPListener *gortsplib.ServerUDPListener, udpRTCPListener *gortsplib.ServerUDPListener,
parent Parent) (*Server, error) { parent Parent) (*Server, error) {
@ -36,6 +37,7 @@ func New(port int,
conf := gortsplib.ServerConf{ conf := gortsplib.ServerConf{
ReadTimeout: readTimeout, ReadTimeout: readTimeout,
WriteTimeout: writeTimeout, WriteTimeout: writeTimeout,
ReadBufferCount: readBufferCount,
UDPRTPListener: udpRTPListener, UDPRTPListener: udpRTPListener,
UDPRTCPListener: udpRTCPListener, UDPRTCPListener: udpRTCPListener,
} }

8
internal/servertls/server.go

@ -30,6 +30,7 @@ type Server struct {
func New(port int, func New(port int,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
readBufferCount uint64,
serverKey string, serverKey string,
serverCert string, serverCert string,
parent Parent) (*Server, error) { parent Parent) (*Server, error) {
@ -40,9 +41,10 @@ func New(port int,
} }
conf := gortsplib.ServerConf{ conf := gortsplib.ServerConf{
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
ReadTimeout: readTimeout, ReadTimeout: readTimeout,
WriteTimeout: writeTimeout, WriteTimeout: writeTimeout,
ReadBufferCount: readBufferCount,
} }
srv, err := conf.Serve(":" + strconv.FormatInt(int64(port), 10)) srv, err := conf.Serve(":" + strconv.FormatInt(int64(port), 10))

35
internal/sourcertsp/source.go

@ -26,13 +26,14 @@ type Parent interface {
// Source is a RTSP source. // Source is a RTSP source.
type Source struct { type Source struct {
ur string ur string
proto *gortsplib.StreamProtocol proto *gortsplib.StreamProtocol
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
wg *sync.WaitGroup readBufferCount uint64
stats *stats.Stats wg *sync.WaitGroup
parent Parent stats *stats.Stats
parent Parent
// in // in
terminate chan struct{} terminate chan struct{}
@ -43,18 +44,20 @@ func New(ur string,
proto *gortsplib.StreamProtocol, proto *gortsplib.StreamProtocol,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
readBufferCount uint64,
wg *sync.WaitGroup, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
parent Parent) *Source { parent Parent) *Source {
s := &Source{ s := &Source{
ur: ur, ur: ur,
proto: proto, proto: proto,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
wg: wg, readBufferCount: readBufferCount,
stats: stats, wg: wg,
parent: parent, stats: stats,
terminate: make(chan struct{}), parent: parent,
terminate: make(chan struct{}),
} }
atomic.AddInt64(s.stats.CountSourcesRtsp, +1) atomic.AddInt64(s.stats.CountSourcesRtsp, +1)
@ -121,7 +124,7 @@ func (s *Source) runInner() bool {
StreamProtocol: s.proto, StreamProtocol: s.proto,
ReadTimeout: s.readTimeout, ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout, WriteTimeout: s.writeTimeout,
ReadBufferCount: 1024, ReadBufferCount: s.readBufferCount,
OnRequest: func(req *base.Request) { OnRequest: func(req *base.Request) {
s.log(logger.Debug, "c->s %v", req) s.log(logger.Debug, "c->s %v", req)
}, },

22
main.go

@ -34,7 +34,7 @@ type program struct {
pprof *pprof.Pprof pprof *pprof.Pprof
serverUDPRTP *gortsplib.ServerUDPListener serverUDPRTP *gortsplib.ServerUDPListener
serverUDPRTCP *gortsplib.ServerUDPListener serverUDPRTCP *gortsplib.ServerUDPListener
serverTCP *serverplain.Server serverPlain *serverplain.Server
serverTLS *servertls.Server serverTLS *servertls.Server
pathMan *pathman.PathManager pathMan *pathman.PathManager
clientMan *clientman.ClientManager clientMan *clientman.ClientManager
@ -202,12 +202,13 @@ func (p *program) createResources(initial bool) error {
} }
} }
if p.serverTCP == nil { if p.serverPlain == nil {
if p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional { if p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional {
p.serverTCP, err = serverplain.New( p.serverPlain, err = serverplain.New(
p.conf.RtspPort, p.conf.RtspPort,
p.conf.ReadTimeout, p.conf.ReadTimeout,
p.conf.WriteTimeout, p.conf.WriteTimeout,
p.conf.ReadBufferCount,
p.serverUDPRTP, p.serverUDPRTP,
p.serverUDPRTCP, p.serverUDPRTCP,
p) p)
@ -223,6 +224,7 @@ func (p *program) createResources(initial bool) error {
p.conf.RtspsPort, p.conf.RtspsPort,
p.conf.ReadTimeout, p.conf.ReadTimeout,
p.conf.WriteTimeout, p.conf.WriteTimeout,
p.conf.ReadBufferCount,
p.conf.ServerKey, p.conf.ServerKey,
p.conf.ServerCert, p.conf.ServerCert,
p) p)
@ -237,6 +239,7 @@ func (p *program) createResources(initial bool) error {
p.conf.RtspPort, p.conf.RtspPort,
p.conf.ReadTimeout, p.conf.ReadTimeout,
p.conf.WriteTimeout, p.conf.WriteTimeout,
p.conf.ReadBufferCount,
p.conf.AuthMethodsParsed, p.conf.AuthMethodsParsed,
p.conf.Paths, p.conf.Paths,
p.stats, p.stats,
@ -252,7 +255,7 @@ func (p *program) createResources(initial bool) error {
p.conf.ProtocolsParsed, p.conf.ProtocolsParsed,
p.stats, p.stats,
p.pathMan, p.pathMan,
p.serverTCP, p.serverPlain,
p.serverTLS, p.serverTLS,
p) p)
} }
@ -302,6 +305,7 @@ func (p *program) closeResources(newConf *conf.Conf) {
newConf.RtspPort != p.conf.RtspPort || newConf.RtspPort != p.conf.RtspPort ||
newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
closeServerUDPRTP || closeServerUDPRTP ||
closeServerUDPRTCP { closeServerUDPRTCP {
closeServerPlain = true closeServerPlain = true
@ -312,7 +316,8 @@ func (p *program) closeResources(newConf *conf.Conf) {
newConf.EncryptionParsed != p.conf.EncryptionParsed || newConf.EncryptionParsed != p.conf.EncryptionParsed ||
newConf.RtspsPort != p.conf.RtspsPort || newConf.RtspsPort != p.conf.RtspsPort ||
newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout { newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount {
closeServerTLS = true closeServerTLS = true
} }
@ -321,6 +326,7 @@ func (p *program) closeResources(newConf *conf.Conf) {
newConf.RtspPort != p.conf.RtspPort || newConf.RtspPort != p.conf.RtspPort ||
newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
!reflect.DeepEqual(newConf.AuthMethodsParsed, p.conf.AuthMethodsParsed) { !reflect.DeepEqual(newConf.AuthMethodsParsed, p.conf.AuthMethodsParsed) {
closePathMan = true closePathMan = true
} else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { } else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
@ -360,9 +366,9 @@ func (p *program) closeResources(newConf *conf.Conf) {
p.serverTLS = nil p.serverTLS = nil
} }
if closeServerPlain && p.serverTCP != nil { if closeServerPlain && p.serverPlain != nil {
p.serverTCP.Close() p.serverPlain.Close()
p.serverTCP = nil p.serverPlain = nil
} }
if closeServerUDPRTCP && p.serverUDPRTCP != nil { if closeServerUDPRTCP && p.serverUDPRTCP != nil {

4
rtsp-simple-server.yml

@ -33,6 +33,10 @@ authMethods: [basic, digest]
readTimeout: 10s readTimeout: 10s
# timeout of write operations. # timeout of write operations.
writeTimeout: 10s writeTimeout: 10s
# number of read buffers.
# a higher number allows a higher throughput,
# a lower number allows to save RAM.
readBufferCount: 512
# enable Prometheus-compatible metrics on port 9998. # enable Prometheus-compatible metrics on port 9998.
metrics: no metrics: no

Loading…
Cancel
Save