Browse Source

rename client into readpublisher

pull/372/head
aler9 5 years ago
parent
commit
681db142f7
  1. 30
      internal/clienthls/client.go
  2. 52
      internal/clientrtmp/client.go
  3. 68
      internal/clientrtsp/client.go
  4. 226
      internal/path/path.go
  5. 50
      internal/pathman/pathman.go
  6. 58
      internal/readpublisher/readpublisher.go
  7. 2
      internal/rtmp/client.go
  8. 2
      internal/serverhls/server.go
  9. 2
      internal/serverrtmp/server.go
  10. 2
      internal/serverrtsp/server.go
  11. 4
      internal/source/source.go

30
internal/clienthls/client.go

@ -18,9 +18,9 @@ import (
"github.com/aler9/gortsplib/pkg/rtpaac" "github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264" "github.com/aler9/gortsplib/pkg/rtph264"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/h264" "github.com/aler9/rtsp-simple-server/internal/h264"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
"github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/stats"
) )
@ -116,10 +116,10 @@ type trackIDPayloadPair struct {
// PathMan is implemented by pathman.PathMan. // PathMan is implemented by pathman.PathMan.
type PathMan interface { type PathMan interface {
OnClientSetupPlay(client.SetupPlayReq) OnReadPublisherSetupPlay(readpublisher.SetupPlayReq)
} }
// Parent is implemented by clientman.ClientMan. // Parent is implemented by serverhls.Server.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnClientClose(*Client) OnClientClose(*Client)
@ -136,7 +136,7 @@ type Client struct {
pathMan PathMan pathMan PathMan
parent Parent parent Parent
path client.Path path readpublisher.Path
ringBuffer *ringbuffer.RingBuffer ringBuffer *ringbuffer.RingBuffer
tsQueue []*tsFile tsQueue []*tsFile
tsByName map[string]*tsFile tsByName map[string]*tsFile
@ -196,8 +196,8 @@ func (c *Client) CloseRequest() {
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
} }
// IsClient implements client.Client. // IsReadPublisher implements readpublisher.ReadPublisher.
func (c *Client) IsClient() {} func (c *Client) IsReadPublisher() {}
// IsSource implements path.source. // IsSource implements path.source.
func (c *Client) IsSource() {} func (c *Client) IsSource() {}
@ -206,7 +206,7 @@ func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.pathName}, args...)...) c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.pathName}, args...)...)
} }
// PathName returns the path name of the client. // PathName returns the path name of the readpublisher.
func (c *Client) PathName() string { func (c *Client) PathName() string {
return c.pathName return c.pathName
} }
@ -223,8 +223,8 @@ func (c *Client) run() {
var aacDecoder *rtpaac.Decoder var aacDecoder *rtpaac.Decoder
err := func() error { err := func() error {
pres := make(chan client.SetupPlayRes) pres := make(chan readpublisher.SetupPlayRes)
c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, c.pathName, nil, pres}) //nolint:govet c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{c, c.pathName, nil, pres}) //nolint:govet
res := <-pres res := <-pres
if res.Err != nil { if res.Err != nil {
@ -286,7 +286,7 @@ func (c *Client) run() {
if c.path != nil { if c.path != nil {
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
} }
@ -315,8 +315,8 @@ func (c *Client) run() {
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount)) c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
resc := make(chan client.PlayRes) resc := make(chan readpublisher.PlayRes)
c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, resc}) //nolint:govet
<-resc <-resc
c.log(logger.Info, "is reading from path '%s'", c.pathName) c.log(logger.Info, "is reading from path '%s'", c.pathName)
@ -483,7 +483,7 @@ func (c *Client) run() {
<-writerDone <-writerDone
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
@ -495,7 +495,7 @@ func (c *Client) run() {
c.log(logger.Info, "ERR: %s", err) c.log(logger.Info, "ERR: %s", err)
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
@ -504,7 +504,7 @@ func (c *Client) run() {
case <-c.terminate: case <-c.terminate:
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.ringBuffer.Close() c.ringBuffer.Close()

52
internal/clientrtmp/client.go

@ -18,10 +18,10 @@ import (
"github.com/aler9/gortsplib/pkg/rtph264" "github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av" "github.com/notedit/rtmp/av"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/h264" "github.com/aler9/rtsp-simple-server/internal/h264"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
"github.com/aler9/rtsp-simple-server/internal/rtcpsenderset" "github.com/aler9/rtsp-simple-server/internal/rtcpsenderset"
"github.com/aler9/rtsp-simple-server/internal/rtmp" "github.com/aler9/rtsp-simple-server/internal/rtmp"
"github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/stats"
@ -68,11 +68,11 @@ type trackIDPayloadPair struct {
// PathMan is implemented by pathman.PathMan. // PathMan is implemented by pathman.PathMan.
type PathMan interface { type PathMan interface {
OnClientSetupPlay(client.SetupPlayReq) OnReadPublisherSetupPlay(readpublisher.SetupPlayReq)
OnClientAnnounce(client.AnnounceReq) OnReadPublisherAnnounce(readpublisher.AnnounceReq)
} }
// Parent is implemented by clientman.ClientMan. // Parent is implemented by serverrtmp.Server.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnClientClose(*Client) OnClientClose(*Client)
@ -149,8 +149,8 @@ func (c *Client) CloseRequest() {
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
} }
// IsClient implements client.Client. // IsReadPublisher implements readpublisher.ReadPublisher.
func (c *Client) IsClient() {} func (c *Client) IsReadPublisher() {}
// IsSource implements path.source. // IsSource implements path.source.
func (c *Client) IsSource() {} func (c *Client) IsSource() {}
@ -195,7 +195,7 @@ func (c *Client) run() {
} }
func (c *Client) runRead() { func (c *Client) runRead() {
var path client.Path var path readpublisher.Path
var videoTrack *gortsplib.Track var videoTrack *gortsplib.Track
var h264Decoder *rtph264.Decoder var h264Decoder *rtph264.Decoder
var audioTrack *gortsplib.Track var audioTrack *gortsplib.Track
@ -205,12 +205,12 @@ func (c *Client) runRead() {
err := func() error { err := func() error {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
sres := make(chan client.SetupPlayRes) sres := make(chan readpublisher.SetupPlayRes)
c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, sres}) //nolint:govet c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{c, pathName, query, sres}) //nolint:govet
res := <-sres res := <-sres
if res.Err != nil { if res.Err != nil {
if _, ok := res.Err.(client.ErrAuthCritical); ok { if _, ok := res.Err.(readpublisher.ErrAuthCritical); ok {
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
select { select {
case <-time.After(pauseAfterAuthError): case <-time.After(pauseAfterAuthError):
@ -257,7 +257,7 @@ func (c *Client) runRead() {
if path != nil { if path != nil {
res := make(chan struct{}) res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
} }
@ -268,8 +268,8 @@ func (c *Client) runRead() {
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount)) c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
pres := make(chan client.PlayRes) pres := make(chan readpublisher.PlayRes)
path.OnClientPlay(client.PlayReq{c, pres}) //nolint:govet path.OnReadPublisherPlay(readpublisher.PlayReq{c, pres}) //nolint:govet
<-pres <-pres
c.log(logger.Info, "is reading from path '%s'", path.Name()) c.log(logger.Info, "is reading from path '%s'", path.Name())
@ -370,7 +370,7 @@ func (c *Client) runRead() {
} }
res := make(chan struct{}) res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
@ -378,7 +378,7 @@ func (c *Client) runRead() {
case <-c.terminate: case <-c.terminate:
res := make(chan struct{}) res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.ringBuffer.Close() c.ringBuffer.Close()
@ -394,7 +394,7 @@ func (c *Client) runPublish() {
var tracks gortsplib.Tracks var tracks gortsplib.Tracks
var h264Encoder *rtph264.Encoder var h264Encoder *rtph264.Encoder
var aacEncoder *rtpaac.Encoder var aacEncoder *rtpaac.Encoder
var path client.Path var path readpublisher.Path
setupDone := make(chan struct{}) setupDone := make(chan struct{})
go func() { go func() {
@ -423,12 +423,12 @@ func (c *Client) runPublish() {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan client.AnnounceRes) resc := make(chan readpublisher.AnnounceRes)
c.pathMan.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet c.pathMan.OnReadPublisherAnnounce(readpublisher.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
if _, ok := res.Err.(client.ErrAuthCritical); ok { if _, ok := res.Err.(readpublisher.ErrAuthCritical); ok {
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
select { select {
case <-time.After(pauseAfterAuthError): case <-time.After(pauseAfterAuthError):
@ -465,8 +465,8 @@ func (c *Client) runPublish() {
readerDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
readerDone <- func() error { readerDone <- func() error {
resc := make(chan client.RecordRes) resc := make(chan readpublisher.RecordRes)
path.OnClientRecord(client.RecordReq{Client: c, Res: resc}) path.OnReadPublisherRecord(readpublisher.RecordReq{ReadPublisher: c, Res: resc})
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
@ -493,7 +493,7 @@ func (c *Client) runPublish() {
}) })
} }
defer func(path client.Path) { defer func(path readpublisher.Path) {
if path.Conf().RunOnPublish != "" { if path.Conf().RunOnPublish != "" {
onPublishCmd.Close() onPublishCmd.Close()
} }
@ -581,7 +581,7 @@ func (c *Client) runPublish() {
} }
res := make(chan struct{}) res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
path = nil path = nil
@ -593,7 +593,7 @@ func (c *Client) runPublish() {
<-readerDone <-readerDone
res := make(chan struct{}) res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
path = nil path = nil
} }
@ -611,7 +611,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
if !ipEqualOrInRange(ip, ips) { if !ipEqualOrInRange(ip, ips) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip) c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
return client.ErrAuthCritical{&base.Response{ //nolint:govet return readpublisher.ErrAuthCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
}} }}
} }
@ -623,7 +623,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
if values.Get("user") != user || if values.Get("user") != user ||
values.Get("pass") != pass { values.Get("pass") != pass {
return client.ErrAuthCritical{nil} //nolint:govet return readpublisher.ErrAuthCritical{nil} //nolint:govet
} }
} }

68
internal/clientrtsp/client.go

@ -16,9 +16,9 @@ import (
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/liberrors" "github.com/aler9/gortsplib/pkg/liberrors"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
"github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/stats"
"github.com/aler9/rtsp-simple-server/internal/streamproc" "github.com/aler9/rtsp-simple-server/internal/streamproc"
) )
@ -47,12 +47,12 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
// PathMan is implemented by pathman.PathMan. // PathMan is implemented by pathman.PathMan.
type PathMan interface { type PathMan interface {
OnClientDescribe(client.DescribeReq) OnReadPublisherDescribe(readpublisher.DescribeReq)
OnClientSetupPlay(client.SetupPlayReq) OnReadPublisherSetupPlay(readpublisher.SetupPlayReq)
OnClientAnnounce(client.AnnounceReq) OnReadPublisherAnnounce(readpublisher.AnnounceReq)
} }
// Parent is implemented by clientman.ClientMan. // Parent is implemented by serverrtsp.Server.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnClientClose(*Client) OnClientClose(*Client)
@ -71,7 +71,7 @@ type Client struct {
pathMan PathMan pathMan PathMan
parent Parent parent Parent
path client.Path path readpublisher.Path
authUser string authUser string
authPass string authPass string
authValidator *auth.Validator authValidator *auth.Validator
@ -138,8 +138,8 @@ func (c *Client) CloseRequest() {
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
} }
// IsClient implements client.Client. // IsReadPublisher implements readpublisher.ReadPublisher.
func (c *Client) IsClient() {} func (c *Client) IsReadPublisher() {}
// IsSource implements path.source. // IsSource implements path.source.
func (c *Client) IsSource() {} func (c *Client) IsSource() {}
@ -175,16 +175,16 @@ func (c *Client) run() {
} }
onDescribe := func(ctx *gortsplib.ServerConnDescribeCtx) (*base.Response, []byte, error) { onDescribe := func(ctx *gortsplib.ServerConnDescribeCtx) (*base.Response, []byte, error) {
resc := make(chan client.DescribeRes) resc := make(chan readpublisher.DescribeRes)
c.pathMan.OnClientDescribe(client.DescribeReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet c.pathMan.OnReadPublisherDescribe(readpublisher.DescribeReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
switch terr := res.Err.(type) { switch terr := res.Err.(type) {
case client.ErrAuthNotCritical: case readpublisher.ErrAuthNotCritical:
return terr.Response, nil, nil return terr.Response, nil, nil
case client.ErrAuthCritical: case readpublisher.ErrAuthCritical:
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
select { select {
case <-time.After(pauseAfterAuthError): case <-time.After(pauseAfterAuthError):
@ -192,7 +192,7 @@ func (c *Client) run() {
} }
return terr.Response, nil, errTerminated return terr.Response, nil, errTerminated
case client.ErrNoOnePublishing: case readpublisher.ErrNoOnePublishing:
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, nil, res.Err }, nil, res.Err
@ -219,16 +219,16 @@ func (c *Client) run() {
} }
onAnnounce := func(ctx *gortsplib.ServerConnAnnounceCtx) (*base.Response, error) { onAnnounce := func(ctx *gortsplib.ServerConnAnnounceCtx) (*base.Response, error) {
resc := make(chan client.AnnounceRes) resc := make(chan readpublisher.AnnounceRes)
c.pathMan.OnClientAnnounce(client.AnnounceReq{c, ctx.Path, ctx.Tracks, ctx.Req, resc}) //nolint:govet c.pathMan.OnReadPublisherAnnounce(readpublisher.AnnounceReq{c, ctx.Path, ctx.Tracks, ctx.Req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
switch terr := res.Err.(type) { switch terr := res.Err.(type) {
case client.ErrAuthNotCritical: case readpublisher.ErrAuthNotCritical:
return terr.Response, nil return terr.Response, nil
case client.ErrAuthCritical: case readpublisher.ErrAuthCritical:
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
select { select {
case <-time.After(pauseAfterAuthError): case <-time.After(pauseAfterAuthError):
@ -267,16 +267,16 @@ func (c *Client) run() {
switch c.conn.State() { switch c.conn.State() {
case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play
resc := make(chan client.SetupPlayRes) resc := make(chan readpublisher.SetupPlayRes)
c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
switch terr := res.Err.(type) { switch terr := res.Err.(type) {
case client.ErrAuthNotCritical: case readpublisher.ErrAuthNotCritical:
return terr.Response, nil return terr.Response, nil
case client.ErrAuthCritical: case readpublisher.ErrAuthCritical:
// wait some seconds to stop brute force attacks // wait some seconds to stop brute force attacks
select { select {
case <-time.After(pauseAfterAuthError): case <-time.After(pauseAfterAuthError):
@ -284,7 +284,7 @@ func (c *Client) run() {
} }
return terr.Response, errTerminated return terr.Response, errTerminated
case client.ErrNoOnePublishing: case readpublisher.ErrNoOnePublishing:
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, res.Err }, res.Err
@ -400,13 +400,13 @@ func (c *Client) run() {
case gortsplib.ServerConnStatePlay: case gortsplib.ServerConnStatePlay:
c.playStop() c.playStop()
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientPause(client.PauseReq{c, res}) //nolint:govet c.path.OnReadPublisherPause(readpublisher.PauseReq{c, res}) //nolint:govet
<-res <-res
case gortsplib.ServerConnStateRecord: case gortsplib.ServerConnStateRecord:
c.recordStop() c.recordStop()
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientPause(client.PauseReq{c, res}) //nolint:govet c.path.OnReadPublisherPause(readpublisher.PauseReq{c, res}) //nolint:govet
<-res <-res
} }
@ -458,7 +458,7 @@ func (c *Client) run() {
if c.path != nil { if c.path != nil {
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.path = nil c.path = nil
} }
@ -480,7 +480,7 @@ func (c *Client) run() {
if c.path != nil { if c.path != nil {
res := make(chan struct{}) res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
<-res <-res
c.path = nil c.path = nil
} }
@ -499,7 +499,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
if !ipEqualOrInRange(ip, ips) { if !ipEqualOrInRange(ip, ips) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip) c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
return client.ErrAuthCritical{&base.Response{ //nolint:govet return readpublisher.ErrAuthCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
}} }}
} }
@ -543,7 +543,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
if c.authFailures > 3 { if c.authFailures > 3 {
c.log(logger.Info, "ERR: unauthorized: %s", err) c.log(logger.Info, "ERR: unauthorized: %s", err)
return client.ErrAuthCritical{&base.Response{ //nolint:govet return readpublisher.ErrAuthCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
Header: base.Header{ Header: base.Header{
"WWW-Authenticate": c.authValidator.GenerateHeader(), "WWW-Authenticate": c.authValidator.GenerateHeader(),
@ -555,7 +555,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
c.log(logger.Debug, "WARN: unauthorized: %s", err) c.log(logger.Debug, "WARN: unauthorized: %s", err)
} }
return client.ErrAuthNotCritical{&base.Response{ //nolint:govet return readpublisher.ErrAuthNotCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized, StatusCode: base.StatusUnauthorized,
Header: base.Header{ Header: base.Header{
"WWW-Authenticate": c.authValidator.GenerateHeader(), "WWW-Authenticate": c.authValidator.GenerateHeader(),
@ -570,9 +570,9 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
return nil return nil
} }
func (c *Client) playStart() client.PlayRes { func (c *Client) playStart() readpublisher.PlayRes {
resc := make(chan client.PlayRes) resc := make(chan readpublisher.PlayRes)
c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, resc}) //nolint:govet
res := <-resc res := <-resc
tracksLen := len(c.conn.SetuppedTracks()) tracksLen := len(c.conn.SetuppedTracks())
@ -606,8 +606,8 @@ func (c *Client) playStop() {
} }
func (c *Client) recordStart() error { func (c *Client) recordStart() error {
resc := make(chan client.RecordRes) resc := make(chan readpublisher.RecordRes)
c.path.OnClientRecord(client.RecordReq{Client: c, Res: resc}) c.path.OnReadPublisherRecord(readpublisher.RecordReq{ReadPublisher: c, Res: resc})
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {

226
internal/path/path.go

@ -11,10 +11,10 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
"github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/sourcertmp" "github.com/aler9/rtsp-simple-server/internal/sourcertmp"
"github.com/aler9/rtsp-simple-server/internal/sourcertsp" "github.com/aler9/rtsp-simple-server/internal/sourcertsp"
@ -70,10 +70,10 @@ type Path struct {
stats *stats.Stats stats *stats.Stats
parent Parent parent Parent
clients map[client.Client]clientState readPublishers map[readpublisher.ReadPublisher]clientState
clientsWg sync.WaitGroup readPublishersWg sync.WaitGroup
describeRequests []client.DescribeReq describeRequests []readpublisher.DescribeReq
setupPlayRequests []client.SetupPlayReq setupPlayRequests []readpublisher.SetupPlayReq
source source.Source source source.Source
sourceTracks gortsplib.Tracks sourceTracks gortsplib.Tracks
sp *streamproc.StreamProc sp *streamproc.StreamProc
@ -92,13 +92,13 @@ type Path struct {
// in // in
extSourceSetReady chan source.ExtSetReadyReq extSourceSetReady chan source.ExtSetReadyReq
extSourceSetNotReady chan source.ExtSetNotReadyReq extSourceSetNotReady chan source.ExtSetNotReadyReq
clientDescribe chan client.DescribeReq clientDescribe chan readpublisher.DescribeReq
clientSetupPlay chan client.SetupPlayReq clientSetupPlay chan readpublisher.SetupPlayReq
clientAnnounce chan client.AnnounceReq clientAnnounce chan readpublisher.AnnounceReq
clientPlay chan client.PlayReq clientPlay chan readpublisher.PlayReq
clientRecord chan client.RecordReq clientRecord chan readpublisher.RecordReq
clientPause chan client.PauseReq clientPause chan readpublisher.PauseReq
clientRemove chan client.RemoveReq clientRemove chan readpublisher.RemoveReq
terminate chan struct{} terminate chan struct{}
} }
@ -128,7 +128,7 @@ func New(
wg: wg, wg: wg,
stats: stats, stats: stats,
parent: parent, parent: parent,
clients: make(map[client.Client]clientState), readPublishers: make(map[readpublisher.ReadPublisher]clientState),
readers: newReadersMap(), readers: newReadersMap(),
describeTimer: newEmptyTimer(), describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(),
@ -136,13 +136,13 @@ func New(
closeTimer: newEmptyTimer(), closeTimer: newEmptyTimer(),
extSourceSetReady: make(chan source.ExtSetReadyReq), extSourceSetReady: make(chan source.ExtSetReadyReq),
extSourceSetNotReady: make(chan source.ExtSetNotReadyReq), extSourceSetNotReady: make(chan source.ExtSetNotReadyReq),
clientDescribe: make(chan client.DescribeReq), clientDescribe: make(chan readpublisher.DescribeReq),
clientSetupPlay: make(chan client.SetupPlayReq), clientSetupPlay: make(chan readpublisher.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq), clientAnnounce: make(chan readpublisher.AnnounceReq),
clientPlay: make(chan client.PlayReq), clientPlay: make(chan readpublisher.PlayReq),
clientRecord: make(chan client.RecordReq), clientRecord: make(chan readpublisher.RecordReq),
clientPause: make(chan client.PauseReq), clientPause: make(chan readpublisher.PauseReq),
clientRemove: make(chan client.RemoveReq), clientRemove: make(chan readpublisher.RemoveReq),
terminate: make(chan struct{}), terminate: make(chan struct{}),
} }
@ -186,16 +186,16 @@ outer:
select { select {
case <-pa.describeTimer.C: case <-pa.describeTimer.C:
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
} }
pa.describeRequests = nil pa.describeRequests = nil
for _, req := range pa.setupPlayRequests { for _, req := range pa.setupPlayRequests {
req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
} }
pa.setupPlayRequests = nil pa.setupPlayRequests = nil
// set state after removeClient(), so schedule* works once // set state after removeReadPublisher(), so schedule* works once
pa.sourceState = sourceStateNotReady pa.sourceState = sourceStateNotReady
pa.scheduleSourceClose() pa.scheduleSourceClose()
@ -234,35 +234,35 @@ outer:
close(req.Res) close(req.Res)
case req := <-pa.clientDescribe: case req := <-pa.clientDescribe:
pa.onClientDescribe(req) pa.onReadPublisherDescribe(req)
case req := <-pa.clientSetupPlay: case req := <-pa.clientSetupPlay:
pa.onClientSetupPlay(req) pa.onReadPublisherSetupPlay(req)
case req := <-pa.clientAnnounce: case req := <-pa.clientAnnounce:
pa.onClientAnnounce(req) pa.onReadPublisherAnnounce(req)
case req := <-pa.clientPlay: case req := <-pa.clientPlay:
pa.onClientPlay(req) pa.onReadPublisherPlay(req)
case req := <-pa.clientRecord: case req := <-pa.clientRecord:
pa.onClientRecord(req) pa.onReadPublisherRecord(req)
case req := <-pa.clientPause: case req := <-pa.clientPause:
pa.onClientPause(req) pa.onReadPublisherPause(req)
case req := <-pa.clientRemove: case req := <-pa.clientRemove:
if _, ok := pa.clients[req.Client]; !ok { if _, ok := pa.readPublishers[req.ReadPublisher]; !ok {
close(req.Res) close(req.Res)
continue continue
} }
if pa.clients[req.Client] != clientStatePreRemove { if pa.readPublishers[req.ReadPublisher] != clientStatePreRemove {
pa.removeClient(req.Client) pa.removeReadPublisher(req.ReadPublisher)
} }
delete(pa.clients, req.Client) delete(pa.readPublishers, req.ReadPublisher)
pa.clientsWg.Done() pa.readPublishersWg.Done()
close(req.Res) close(req.Res)
case <-pa.terminate: case <-pa.terminate:
@ -292,14 +292,14 @@ outer:
} }
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
} }
for _, req := range pa.setupPlayRequests { for _, req := range pa.setupPlayRequests {
req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
} }
for c, state := range pa.clients { for c, state := range pa.readPublishers {
if state != clientStatePreRemove { if state != clientStatePreRemove {
switch state { switch state {
case clientStatePlay: case clientStatePlay:
@ -312,7 +312,7 @@ outer:
c.CloseRequest() c.CloseRequest()
} }
} }
pa.clientsWg.Wait() pa.readPublishersWg.Wait()
close(pa.extSourceSetReady) close(pa.extSourceSetReady)
close(pa.extSourceSetNotReady) close(pa.extSourceSetNotReady)
@ -345,19 +345,19 @@ func (pa *Path) exhaustChannels() {
if !ok { if !ok {
return return
} }
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientSetupPlay: case req, ok := <-pa.clientSetupPlay:
if !ok { if !ok {
return return
} }
req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientAnnounce: case req, ok := <-pa.clientAnnounce:
if !ok { if !ok {
return return
} }
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientPlay: case req, ok := <-pa.clientPlay:
if !ok { if !ok {
@ -382,12 +382,12 @@ func (pa *Path) exhaustChannels() {
return return
} }
if _, ok := pa.clients[req.Client]; !ok { if _, ok := pa.readPublishers[req.ReadPublisher]; !ok {
close(req.Res) close(req.Res)
continue continue
} }
pa.clientsWg.Done() pa.readPublishersWg.Done()
close(req.Res) close(req.Res)
} }
} }
@ -426,8 +426,8 @@ func (pa *Path) startExternalSource() {
} }
} }
func (pa *Path) hasClients() bool { func (pa *Path) hasReadPublishers() bool {
for _, state := range pa.clients { for _, state := range pa.readPublishers {
if state != clientStatePreRemove { if state != clientStatePreRemove {
return true return true
} }
@ -435,8 +435,8 @@ func (pa *Path) hasClients() bool {
return false return false
} }
func (pa *Path) hasClientsNotSources() bool { func (pa *Path) hasReadPublishersNotSources() bool {
for c, state := range pa.clients { for c, state := range pa.readPublishers {
if state != clientStatePreRemove && c != pa.source { if state != clientStatePreRemove && c != pa.source {
return true return true
} }
@ -444,14 +444,14 @@ func (pa *Path) hasClientsNotSources() bool {
return false return false
} }
func (pa *Path) addClient(c client.Client, state clientState) { func (pa *Path) addReadPublisher(c readpublisher.ReadPublisher, state clientState) {
pa.clients[c] = state pa.readPublishers[c] = state
pa.clientsWg.Add(1) pa.readPublishersWg.Add(1)
} }
func (pa *Path) removeClient(c client.Client) { func (pa *Path) removeReadPublisher(c readpublisher.ReadPublisher) {
state := pa.clients[c] state := pa.readPublishers[c]
pa.clients[c] = clientStatePreRemove pa.readPublishers[c] = clientStatePreRemove
switch state { switch state {
case clientStatePlay: case clientStatePlay:
@ -466,10 +466,10 @@ func (pa *Path) removeClient(c client.Client) {
if pa.source == c { if pa.source == c {
pa.source = nil pa.source = nil
// close all clients that are reading or waiting to read // close all readPublishers that are reading or waiting to read
for oc, state := range pa.clients { for oc, state := range pa.readPublishers {
if state != clientStatePreRemove { if state != clientStatePreRemove {
pa.removeClient(oc) pa.removeReadPublisher(oc)
oc.CloseRequest() oc.CloseRequest()
} }
} }
@ -489,12 +489,12 @@ func (pa *Path) onSourceSetReady() {
pa.sourceState = sourceStateReady pa.sourceState = sourceStateReady
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet
} }
pa.describeRequests = nil pa.describeRequests = nil
for _, req := range pa.setupPlayRequests { for _, req := range pa.setupPlayRequests {
pa.onClientSetupPlayPost(req) pa.onReadPublisherSetupPlayPost(req)
} }
pa.setupPlayRequests = nil pa.setupPlayRequests = nil
@ -506,10 +506,10 @@ func (pa *Path) onSourceSetReady() {
func (pa *Path) onSourceSetNotReady() { func (pa *Path) onSourceSetNotReady() {
pa.sourceState = sourceStateNotReady pa.sourceState = sourceStateNotReady
// close all clients that are reading or waiting to read // close all readPublishers that are reading or waiting to read
for c, state := range pa.clients { for c, state := range pa.readPublishers {
if c != pa.source && state != clientStatePreRemove { if c != pa.source && state != clientStatePreRemove {
pa.removeClient(c) pa.removeReadPublisher(c)
c.CloseRequest() c.CloseRequest()
} }
} }
@ -556,9 +556,9 @@ func (pa *Path) fixedPublisherStart() {
} }
} }
func (pa *Path) onClientDescribe(req client.DescribeReq) { func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
if _, ok := pa.clients[req.Client]; ok { if _, ok := pa.readPublishers[req.ReadPublisher]; ok {
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet
return return
} }
@ -566,13 +566,13 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) {
pa.scheduleClose() pa.scheduleClose()
if _, ok := pa.source.(*sourceRedirect); ok { if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- client.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet
return return
} }
switch pa.sourceState { switch pa.sourceState {
case sourceStateReady: case sourceStateReady:
req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet
return return
case sourceStateWaitingDescribe: case sourceStateWaitingDescribe:
@ -593,22 +593,22 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) {
} }
return pa.conf.Fallback return pa.conf.Fallback
}() }()
req.Res <- client.DescribeRes{nil, fallbackURL, nil} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, fallbackURL, nil} //nolint:govet
return return
} }
req.Res <- client.DescribeRes{nil, "", client.ErrNoOnePublishing{pa.name}} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", readpublisher.ErrNoOnePublishing{pa.name}} //nolint:govet
return return
} }
} }
func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) { func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
pa.fixedPublisherStart() pa.fixedPublisherStart()
pa.scheduleClose() pa.scheduleClose()
switch pa.sourceState { switch pa.sourceState {
case sourceStateReady: case sourceStateReady:
pa.onClientSetupPlayPost(req) pa.onReadPublisherSetupPlayPost(req)
return return
case sourceStateWaitingDescribe: case sourceStateWaitingDescribe:
@ -616,13 +616,13 @@ func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) {
return return
case sourceStateNotReady: case sourceStateNotReady:
req.Res <- client.SetupPlayRes{nil, nil, client.ErrNoOnePublishing{pa.name}} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, readpublisher.ErrNoOnePublishing{pa.name}} //nolint:govet
return return
} }
} }
func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) { func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) {
if _, ok := pa.clients[req.Client]; !ok { if _, ok := pa.readPublishers[req.ReadPublisher]; !ok {
// prevent on-demand source from closing // prevent on-demand source from closing
if pa.sourceCloseTimerStarted { if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer = newEmptyTimer() pa.sourceCloseTimer = newEmptyTimer()
@ -635,40 +635,40 @@ func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) {
pa.runOnDemandCloseTimerStarted = false pa.runOnDemandCloseTimerStarted = false
} }
pa.addClient(req.Client, clientStatePrePlay) pa.addReadPublisher(req.ReadPublisher, clientStatePrePlay)
} }
req.Res <- client.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet req.Res <- readpublisher.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet
} }
func (pa *Path) onClientPlay(req client.PlayReq) { func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1) atomic.AddInt64(pa.stats.CountReaders, 1)
pa.clients[req.Client] = clientStatePlay pa.readPublishers[req.ReadPublisher] = clientStatePlay
pa.readers.add(req.Client) pa.readers.add(req.ReadPublisher)
req.Res <- client.PlayRes{TrackInfos: pa.sp.TrackInfos()} req.Res <- readpublisher.PlayRes{TrackInfos: pa.sp.TrackInfos()}
} }
func (pa *Path) onClientAnnounce(req client.AnnounceReq) { func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
if _, ok := pa.clients[req.Client]; ok { if _, ok := pa.readPublishers[req.ReadPublisher]; ok {
req.Res <- client.AnnounceRes{nil, fmt.Errorf("already publishing or reading")} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("already publishing or reading")} //nolint:govet
return return
} }
if pa.hasExternalSource() { if pa.hasExternalSource() {
req.Res <- client.AnnounceRes{nil, fmt.Errorf("path '%s' is assigned to an external source", pa.name)} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("path '%s' is assigned to an external source", pa.name)} //nolint:govet
return return
} }
if pa.source != nil { if pa.source != nil {
if pa.conf.DisablePublisherOverride { if pa.conf.DisablePublisherOverride {
req.Res <- client.AnnounceRes{nil, fmt.Errorf("another client is already publishing on path '%s'", pa.name)} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("another client is already publishing on path '%s'", pa.name)} //nolint:govet
return return
} }
pa.Log(logger.Info, "disconnecting existing publisher") pa.Log(logger.Info, "disconnecting existing publisher")
curPublisher := pa.source.(client.Client) curPublisher := pa.source.(readpublisher.ReadPublisher)
pa.removeClient(curPublisher) pa.removeReadPublisher(curPublisher)
curPublisher.CloseRequest() curPublisher.CloseRequest()
// prevent path closure // prevent path closure
@ -679,30 +679,30 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) {
} }
} }
pa.addClient(req.Client, clientStatePreRecord) pa.addReadPublisher(req.ReadPublisher, clientStatePreRecord)
pa.source = req.Client pa.source = req.ReadPublisher
pa.sourceTracks = req.Tracks pa.sourceTracks = req.Tracks
req.Res <- client.AnnounceRes{pa, nil} //nolint:govet req.Res <- readpublisher.AnnounceRes{pa, nil} //nolint:govet
} }
func (pa *Path) onClientRecord(req client.RecordReq) { func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) {
if state, ok := pa.clients[req.Client]; !ok || state != clientStatePreRecord { if state, ok := pa.readPublishers[req.ReadPublisher]; !ok || state != clientStatePreRecord {
req.Res <- client.RecordRes{SP: nil, Err: fmt.Errorf("not recording anymore")} req.Res <- readpublisher.RecordRes{SP: nil, Err: fmt.Errorf("not recording anymore")}
return return
} }
atomic.AddInt64(pa.stats.CountPublishers, 1) atomic.AddInt64(pa.stats.CountPublishers, 1)
pa.clients[req.Client] = clientStateRecord pa.readPublishers[req.ReadPublisher] = clientStateRecord
pa.onSourceSetReady() pa.onSourceSetReady()
pa.sp = streamproc.New(pa, len(pa.sourceTracks)) pa.sp = streamproc.New(pa, len(pa.sourceTracks))
req.Res <- client.RecordRes{SP: pa.sp, Err: nil} req.Res <- readpublisher.RecordRes{SP: pa.sp, Err: nil}
} }
func (pa *Path) onClientPause(req client.PauseReq) { func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
state, ok := pa.clients[req.Client] state, ok := pa.readPublishers[req.ReadPublisher]
if !ok { if !ok {
close(req.Res) close(req.Res)
return return
@ -710,12 +710,12 @@ func (pa *Path) onClientPause(req client.PauseReq) {
if state == clientStatePlay { if state == clientStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
pa.clients[req.Client] = clientStatePrePlay pa.readPublishers[req.ReadPublisher] = clientStatePrePlay
pa.readers.remove(req.Client) pa.readers.remove(req.ReadPublisher)
} else if state == clientStateRecord { } else if state == clientStateRecord {
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.clients[req.Client] = clientStatePreRecord pa.readPublishers[req.ReadPublisher] = clientStatePreRecord
pa.onSourceSetNotReady() pa.onSourceSetNotReady()
} }
@ -729,7 +729,7 @@ func (pa *Path) scheduleSourceClose() {
if pa.sourceCloseTimerStarted || if pa.sourceCloseTimerStarted ||
pa.sourceState == sourceStateWaitingDescribe || pa.sourceState == sourceStateWaitingDescribe ||
pa.hasClients() { pa.hasReadPublishers() {
return return
} }
@ -745,7 +745,7 @@ func (pa *Path) scheduleRunOnDemandClose() {
if pa.runOnDemandCloseTimerStarted || if pa.runOnDemandCloseTimerStarted ||
pa.sourceState == sourceStateWaitingDescribe || pa.sourceState == sourceStateWaitingDescribe ||
pa.hasClientsNotSources() { pa.hasReadPublishersNotSources() {
return return
} }
@ -756,7 +756,7 @@ func (pa *Path) scheduleRunOnDemandClose() {
func (pa *Path) scheduleClose() { func (pa *Path) scheduleClose() {
if pa.conf.Regexp != nil && if pa.conf.Regexp != nil &&
!pa.hasClients() && !pa.hasReadPublishers() &&
pa.source == nil && pa.source == nil &&
pa.sourceState != sourceStateWaitingDescribe && pa.sourceState != sourceStateWaitingDescribe &&
!pa.sourceCloseTimerStarted && !pa.sourceCloseTimerStarted &&
@ -795,37 +795,37 @@ func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) {
} }
// OnPathManDescribe is called by pathman.PathMan. // OnPathManDescribe is called by pathman.PathMan.
func (pa *Path) OnPathManDescribe(req client.DescribeReq) { func (pa *Path) OnPathManDescribe(req readpublisher.DescribeReq) {
pa.clientDescribe <- req pa.clientDescribe <- req
} }
// OnPathManSetupPlay is called by pathman.PathMan. // OnPathManSetupPlay is called by pathman.PathMan.
func (pa *Path) OnPathManSetupPlay(req client.SetupPlayReq) { func (pa *Path) OnPathManSetupPlay(req readpublisher.SetupPlayReq) {
pa.clientSetupPlay <- req pa.clientSetupPlay <- req
} }
// OnPathManAnnounce is called by pathman.PathMan. // OnPathManAnnounce is called by pathman.PathMan.
func (pa *Path) OnPathManAnnounce(req client.AnnounceReq) { func (pa *Path) OnPathManAnnounce(req readpublisher.AnnounceReq) {
pa.clientAnnounce <- req pa.clientAnnounce <- req
} }
// OnClientRemove is called by a client. // OnReadPublisherRemove is called by a readpublisher.
func (pa *Path) OnClientRemove(req client.RemoveReq) { func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) {
pa.clientRemove <- req pa.clientRemove <- req
} }
// OnClientPlay is called by a client. // OnReadPublisherPlay is called by a readpublisher.
func (pa *Path) OnClientPlay(req client.PlayReq) { func (pa *Path) OnReadPublisherPlay(req readpublisher.PlayReq) {
pa.clientPlay <- req pa.clientPlay <- req
} }
// OnClientRecord is called by a client. // OnReadPublisherRecord is called by a readpublisher.
func (pa *Path) OnClientRecord(req client.RecordReq) { func (pa *Path) OnReadPublisherRecord(req readpublisher.RecordReq) {
pa.clientRecord <- req pa.clientRecord <- req
} }
// OnClientPause is called by a client. // OnReadPublisherPause is called by a readpublisher.
func (pa *Path) OnClientPause(req client.PauseReq) { func (pa *Path) OnReadPublisherPause(req readpublisher.PauseReq) {
pa.clientPause <- req pa.clientPause <- req
} }

50
internal/pathman/pathman.go

@ -7,10 +7,10 @@ import (
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/path" "github.com/aler9/rtsp-simple-server/internal/path"
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
"github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/stats"
) )
@ -37,9 +37,9 @@ type PathManager struct {
// in // in
confReload chan map[string]*conf.PathConf confReload chan map[string]*conf.PathConf
pathClose chan *path.Path pathClose chan *path.Path
clientDescribe chan client.DescribeReq clientDescribe chan readpublisher.DescribeReq
clientSetupPlay chan client.SetupPlayReq clientSetupPlay chan readpublisher.SetupPlayReq
clientAnnounce chan client.AnnounceReq clientAnnounce chan readpublisher.AnnounceReq
terminate chan struct{} terminate chan struct{}
// out // out
@ -71,9 +71,9 @@ func New(
paths: make(map[string]*path.Path), paths: make(map[string]*path.Path),
confReload: make(chan map[string]*conf.PathConf), confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path.Path), pathClose: make(chan *path.Path),
clientDescribe: make(chan client.DescribeReq), clientDescribe: make(chan readpublisher.DescribeReq),
clientSetupPlay: make(chan client.SetupPlayReq), clientSetupPlay: make(chan readpublisher.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq), clientAnnounce: make(chan readpublisher.AnnounceReq),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -145,11 +145,11 @@ outer:
case req := <-pm.clientDescribe: case req := <-pm.clientDescribe:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
continue continue
} }
err = req.Client.Authenticate( err = req.ReadPublisher.Authenticate(
pm.authMethods, pm.authMethods,
req.PathName, req.PathName,
pathConf.ReadIpsParsed, pathConf.ReadIpsParsed,
@ -157,7 +157,7 @@ outer:
pathConf.ReadPass, pathConf.ReadPass,
req.Data) req.Data)
if err != nil { if err != nil {
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
continue continue
} }
@ -171,11 +171,11 @@ outer:
case req := <-pm.clientSetupPlay: case req := <-pm.clientSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- client.SetupPlayRes{nil, nil, err} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
continue continue
} }
err = req.Client.Authenticate( err = req.ReadPublisher.Authenticate(
pm.authMethods, pm.authMethods,
req.PathName, req.PathName,
pathConf.ReadIpsParsed, pathConf.ReadIpsParsed,
@ -183,7 +183,7 @@ outer:
pathConf.ReadPass, pathConf.ReadPass,
req.Data) req.Data)
if err != nil { if err != nil {
req.Res <- client.SetupPlayRes{nil, nil, err} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
continue continue
} }
@ -197,11 +197,11 @@ outer:
case req := <-pm.clientAnnounce: case req := <-pm.clientAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- client.AnnounceRes{nil, err} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
continue continue
} }
err = req.Client.Authenticate( err = req.ReadPublisher.Authenticate(
pm.authMethods, pm.authMethods,
req.PathName, req.PathName,
pathConf.PublishIpsParsed, pathConf.PublishIpsParsed,
@ -209,7 +209,7 @@ outer:
pathConf.PublishPass, pathConf.PublishPass,
req.Data) req.Data)
if err != nil { if err != nil {
req.Res <- client.AnnounceRes{nil, err} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
continue continue
} }
@ -242,19 +242,19 @@ outer:
if !ok { if !ok {
return return
} }
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.clientSetupPlay: case req, ok := <-pm.clientSetupPlay:
if !ok { if !ok {
return return
} }
req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.clientAnnounce: case req, ok := <-pm.clientAnnounce:
if !ok { if !ok {
return return
} }
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
} }
} }
}() }()
@ -325,17 +325,17 @@ func (pm *PathManager) OnPathClose(pa *path.Path) {
pm.pathClose <- pa pm.pathClose <- pa
} }
// OnClientDescribe is called by clientman.ClientMan. // OnReadPublisherDescribe is called by clientman.ClientMan.
func (pm *PathManager) OnClientDescribe(req client.DescribeReq) { func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) {
pm.clientDescribe <- req pm.clientDescribe <- req
} }
// OnClientAnnounce is called by clientman.ClientMan. // OnReadPublisherAnnounce is called by clientman.ClientMan.
func (pm *PathManager) OnClientAnnounce(req client.AnnounceReq) { func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) {
pm.clientAnnounce <- req pm.clientAnnounce <- req
} }
// OnClientSetupPlay is called by clientman.ClientMan. // OnReadPublisherSetupPlay is called by clientman.ClientMan.
func (pm *PathManager) OnClientSetupPlay(req client.SetupPlayReq) { func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
pm.clientSetupPlay <- req pm.clientSetupPlay <- req
} }

58
internal/client/client.go → internal/readpublisher/readpublisher.go

@ -1,4 +1,4 @@
package client package readpublisher
import ( import (
"fmt" "fmt"
@ -45,10 +45,10 @@ func (ErrAuthCritical) Error() string {
type Path interface { type Path interface {
Name() string Name() string
Conf() *conf.PathConf Conf() *conf.PathConf
OnClientRemove(RemoveReq) OnReadPublisherRemove(RemoveReq)
OnClientPlay(PlayReq) OnReadPublisherPlay(PlayReq)
OnClientRecord(RecordReq) OnReadPublisherRecord(RecordReq)
OnClientPause(PauseReq) OnReadPublisherPause(PauseReq)
} }
// DescribeRes is a describe response. // DescribeRes is a describe response.
@ -60,10 +60,10 @@ type DescribeRes struct {
// DescribeReq is a describe request. // DescribeReq is a describe request.
type DescribeReq struct { type DescribeReq struct {
Client Client ReadPublisher ReadPublisher
PathName string PathName string
Data *base.Request Data *base.Request
Res chan DescribeRes Res chan DescribeRes
} }
// SetupPlayRes is a setup/play response. // SetupPlayRes is a setup/play response.
@ -75,10 +75,10 @@ type SetupPlayRes struct {
// SetupPlayReq is a setup/play request. // SetupPlayReq is a setup/play request.
type SetupPlayReq struct { type SetupPlayReq struct {
Client Client ReadPublisher ReadPublisher
PathName string PathName string
Data interface{} Data interface{}
Res chan SetupPlayRes Res chan SetupPlayRes
} }
// AnnounceRes is a announce response. // AnnounceRes is a announce response.
@ -89,17 +89,17 @@ type AnnounceRes struct {
// AnnounceReq is a announce request. // AnnounceReq is a announce request.
type AnnounceReq struct { type AnnounceReq struct {
Client Client ReadPublisher ReadPublisher
PathName string PathName string
Tracks gortsplib.Tracks Tracks gortsplib.Tracks
Data interface{} Data interface{}
Res chan AnnounceRes Res chan AnnounceRes
} }
// RemoveReq is a remove request. // RemoveReq is a remove request.
type RemoveReq struct { type RemoveReq struct {
Client Client ReadPublisher ReadPublisher
Res chan struct{} Res chan struct{}
} }
// PlayRes is a play response. // PlayRes is a play response.
@ -109,8 +109,8 @@ type PlayRes struct {
// PlayReq is a play request. // PlayReq is a play request.
type PlayReq struct { type PlayReq struct {
Client Client ReadPublisher ReadPublisher
Res chan PlayRes Res chan PlayRes
} }
// RecordRes is a record response. // RecordRes is a record response.
@ -121,19 +121,19 @@ type RecordRes struct {
// RecordReq is a record request. // RecordReq is a record request.
type RecordReq struct { type RecordReq struct {
Client Client ReadPublisher ReadPublisher
Res chan RecordRes Res chan RecordRes
} }
// PauseReq is a pause request. // PauseReq is a pause request.
type PauseReq struct { type PauseReq struct {
Client Client ReadPublisher ReadPublisher
Res chan struct{} Res chan struct{}
} }
// Client is implemented by all client*. // ReadPublisher is an entity that can read/publish from/to a path.
type Client interface { type ReadPublisher interface {
IsClient() IsReadPublisher()
IsSource() IsSource()
Close() Close()
CloseRequest() CloseRequest()

2
internal/rtmp/client.go

@ -11,7 +11,7 @@ import (
// DialContext connects to a server in reading mode. // DialContext connects to a server in reading mode.
func DialContext(ctx context.Context, address string) (*Conn, error) { func DialContext(ctx context.Context, address string) (*Conn, error) {
// https://github.com/aler9/rtmp/blob/master/format/rtmp/client.go#L74 // https://github.com/aler9/rtmp/blob/master/format/rtmp/readpublisher.go#L74
u, err := url.Parse(address) u, err := url.Parse(address)
if err != nil { if err != nil {

2
internal/serverhls/server.go

@ -206,7 +206,7 @@ func (s *Server) doClientClose(c *clienthls.Client) {
c.Close() c.Close()
} }
// OnClientClose is called by a client. // OnClientClose is called by a readpublisher.
func (s *Server) OnClientClose(c *clienthls.Client) { func (s *Server) OnClientClose(c *clienthls.Client) {
s.clientClose <- c s.clientClose <- c
} }

2
internal/serverrtmp/server.go

@ -186,7 +186,7 @@ func (s *Server) doClientClose(c *clientrtmp.Client) {
c.Close() c.Close()
} }
// OnClientClose is called by a client. // OnClientClose is called by a readpublisher.
func (s *Server) OnClientClose(c *clientrtmp.Client) { func (s *Server) OnClientClose(c *clientrtmp.Client) {
s.clientClose <- c s.clientClose <- c
} }

2
internal/serverrtsp/server.go

@ -230,7 +230,7 @@ func (s *Server) doClientClose(c *clientrtsp.Client) {
c.Close() c.Close()
} }
// OnClientClose is called by a client. // OnClientClose is called by a readpublisher.
func (s *Server) OnClientClose(c *clientrtsp.Client) { func (s *Server) OnClientClose(c *clientrtsp.Client) {
s.clientClose <- c s.clientClose <- c
} }

4
internal/source/source.go

@ -4,12 +4,12 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
// Source is implemented by all sources (clients and external sources). // Source is a source.
type Source interface { type Source interface {
IsSource() IsSource()
} }
// ExtSource is implemented by all external sources. // ExtSource is an external source.
type ExtSource interface { type ExtSource interface {
IsSource() IsSource()
IsExtSource() IsExtSource()

Loading…
Cancel
Save