Browse Source

expose path manager to clients

pull/340/head
aler9 5 years ago
parent
commit
3636f0106a
  1. 22
      internal/clientman/clientman.go
  2. 15
      internal/clientrtmp/client.go
  3. 19
      internal/clientrtsp/client.go

22
internal/clientman/clientman.go

@ -30,7 +30,7 @@ type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
} }
// ClientManager is a clientrtsp.Client manager. // ClientManager is a client manager.
type ClientManager struct { type ClientManager struct {
rtspPort int rtspPort int
readTimeout time.Duration readTimeout time.Duration
@ -147,6 +147,7 @@ outer:
&cm.wg, &cm.wg,
cm.stats, cm.stats,
conn, conn,
cm.pathMan,
cm) cm)
cm.clients[c] = struct{}{} cm.clients[c] = struct{}{}
@ -161,6 +162,7 @@ outer:
&cm.wg, &cm.wg,
cm.stats, cm.stats,
conn, conn,
cm.pathMan,
cm) cm)
cm.clients[c] = struct{}{} cm.clients[c] = struct{}{}
@ -175,6 +177,7 @@ outer:
&cm.wg, &cm.wg,
cm.stats, cm.stats,
conn, conn,
cm.pathMan,
cm) cm)
cm.clients[c] = struct{}{} cm.clients[c] = struct{}{}
@ -219,22 +222,7 @@ outer:
close(cm.clientClose) close(cm.clientClose)
} }
// OnClientClose is called by clientrtsp.Client. // OnClientClose is called by a client.
func (cm *ClientManager) OnClientClose(c client.Client) { func (cm *ClientManager) OnClientClose(c client.Client) {
cm.clientClose <- c cm.clientClose <- c
} }
// OnClientDescribe is called by clientrtsp.Client.
func (cm *ClientManager) OnClientDescribe(req client.DescribeReq) {
cm.pathMan.OnClientDescribe(req)
}
// OnClientAnnounce is called by clientrtsp.Client.
func (cm *ClientManager) OnClientAnnounce(req client.AnnounceReq) {
cm.pathMan.OnClientAnnounce(req)
}
// OnClientSetupPlay is called by clientrtsp.Client.
func (cm *ClientManager) OnClientSetupPlay(req client.SetupPlayReq) {
cm.pathMan.OnClientSetupPlay(req)
}

15
internal/clientrtmp/client.go

@ -62,12 +62,16 @@ type trackIDBufPair struct {
buf []byte buf []byte
} }
// PathMan is implemented by pathman.PathMan.
type PathMan interface {
OnClientSetupPlay(client.SetupPlayReq)
OnClientAnnounce(client.AnnounceReq)
}
// Parent is implemented by clientman.ClientMan. // Parent is implemented by clientman.ClientMan.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnClientClose(client.Client) OnClientClose(client.Client)
OnClientSetupPlay(client.SetupPlayReq)
OnClientAnnounce(client.AnnounceReq)
} }
// Client is a RTMP client. // Client is a RTMP client.
@ -81,6 +85,7 @@ type Client struct {
stats *stats.Stats stats *stats.Stats
wg *sync.WaitGroup wg *sync.WaitGroup
conn *rtmputils.Conn conn *rtmputils.Conn
pathMan PathMan
parent Parent parent Parent
// read mode only // read mode only
@ -105,6 +110,7 @@ func New(
wg *sync.WaitGroup, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
conn *rtmputils.Conn, conn *rtmputils.Conn,
pathMan PathMan,
parent Parent) *Client { parent Parent) *Client {
c := &Client{ c := &Client{
@ -117,6 +123,7 @@ func New(
wg: wg, wg: wg,
stats: stats, stats: stats,
conn: conn, conn: conn,
pathMan: pathMan,
parent: parent, parent: parent,
terminate: make(chan struct{}), terminate: make(chan struct{}),
} }
@ -177,7 +184,7 @@ func (c *Client) runRead() {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan client.SetupPlayRes) resc := make(chan client.SetupPlayRes)
c.parent.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, resc}) //nolint:govet c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
@ -450,7 +457,7 @@ func (c *Client) runPublish() {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan client.AnnounceRes) resc := make(chan client.AnnounceRes)
c.parent.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet c.pathMan.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {

19
internal/clientrtsp/client.go

@ -45,13 +45,17 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
return false return false
} }
// PathMan is implemented by pathman.PathMan.
type PathMan interface {
OnClientDescribe(client.DescribeReq)
OnClientSetupPlay(client.SetupPlayReq)
OnClientAnnounce(client.AnnounceReq)
}
// Parent is implemented by clientman.ClientMan. // Parent is implemented by clientman.ClientMan.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnClientClose(client.Client) OnClientClose(client.Client)
OnClientDescribe(client.DescribeReq)
OnClientAnnounce(client.AnnounceReq)
OnClientSetupPlay(client.SetupPlayReq)
} }
// Client is a RTSP client. // Client is a RTSP client.
@ -64,6 +68,7 @@ type Client struct {
wg *sync.WaitGroup wg *sync.WaitGroup
stats *stats.Stats stats *stats.Stats
conn *gortsplib.ServerConn conn *gortsplib.ServerConn
pathMan PathMan
parent Parent parent Parent
path client.Path path client.Path
@ -95,6 +100,7 @@ func New(
wg *sync.WaitGroup, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
conn *gortsplib.ServerConn, conn *gortsplib.ServerConn,
pathMan PathMan,
parent Parent) *Client { parent Parent) *Client {
c := &Client{ c := &Client{
@ -106,6 +112,7 @@ func New(
wg: wg, wg: wg,
stats: stats, stats: stats,
conn: conn, conn: conn,
pathMan: pathMan,
parent: parent, parent: parent,
terminate: make(chan struct{}), terminate: make(chan struct{}),
} }
@ -168,7 +175,7 @@ func (c *Client) run() {
onDescribe := func(ctx *gortsplib.ServerConnDescribeCtx) (*base.Response, []byte, error) { onDescribe := func(ctx *gortsplib.ServerConnDescribeCtx) (*base.Response, []byte, error) {
resc := make(chan client.DescribeRes) resc := make(chan client.DescribeRes)
c.parent.OnClientDescribe(client.DescribeReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet c.pathMan.OnClientDescribe(client.DescribeReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
@ -212,7 +219,7 @@ func (c *Client) run() {
onAnnounce := func(ctx *gortsplib.ServerConnAnnounceCtx) (*base.Response, error) { onAnnounce := func(ctx *gortsplib.ServerConnAnnounceCtx) (*base.Response, error) {
resc := make(chan client.AnnounceRes) resc := make(chan client.AnnounceRes)
c.parent.OnClientAnnounce(client.AnnounceReq{c, ctx.Path, ctx.Tracks, ctx.Req, resc}) //nolint:govet c.pathMan.OnClientAnnounce(client.AnnounceReq{c, ctx.Path, ctx.Tracks, ctx.Req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
@ -260,7 +267,7 @@ func (c *Client) run() {
switch c.conn.State() { switch c.conn.State() {
case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play
resc := make(chan client.SetupPlayRes) resc := make(chan client.SetupPlayRes)
c.parent.OnClientSetupPlay(client.SetupPlayReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {

Loading…
Cancel
Save