Browse Source

RTMP client: speed up initialization by moving code into the client routine

pull/346/head
aler9 4 years ago
parent
commit
de378e0ed5
  1. 10
      internal/clientman/clientman.go
  2. 4
      internal/clientrtmp/client.go
  3. 11
      internal/serverrtmp/server.go

10
internal/clientman/clientman.go

@ -1,6 +1,7 @@
package clientman package clientman
import ( import (
"net"
"sync" "sync"
"time" "time"
@ -11,7 +12,6 @@ import (
"github.com/aler9/rtsp-simple-server/internal/clientrtmp" "github.com/aler9/rtsp-simple-server/internal/clientrtmp"
"github.com/aler9/rtsp-simple-server/internal/clientrtsp" "github.com/aler9/rtsp-simple-server/internal/clientrtsp"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmp"
"github.com/aler9/rtsp-simple-server/internal/serverrtmp" "github.com/aler9/rtsp-simple-server/internal/serverrtmp"
"github.com/aler9/rtsp-simple-server/internal/serverrtsp" "github.com/aler9/rtsp-simple-server/internal/serverrtsp"
"github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/stats"
@ -126,11 +126,11 @@ func (cm *ClientManager) run() {
return make(chan *gortsplib.ServerConn) return make(chan *gortsplib.ServerConn)
}() }()
rtmpAccept := func() chan *rtmp.Conn { rtmpAccept := func() chan net.Conn {
if cm.serverRTMP != nil { if cm.serverRTMP != nil {
return cm.serverRTMP.Accept() return cm.serverRTMP.Accept()
} }
return make(chan *rtmp.Conn) return make(chan net.Conn)
}() }()
outer: outer:
@ -166,7 +166,7 @@ outer:
cm) cm)
cm.clients[c] = struct{}{} cm.clients[c] = struct{}{}
case conn := <-rtmpAccept: case nconn := <-rtmpAccept:
c := clientrtmp.New( c := clientrtmp.New(
cm.rtspPort, cm.rtspPort,
cm.readTimeout, cm.readTimeout,
@ -176,7 +176,7 @@ outer:
cm.runOnConnectRestart, cm.runOnConnectRestart,
&cm.wg, &cm.wg,
cm.stats, cm.stats,
conn, nconn,
cm.pathMan, cm.pathMan,
cm) cm)
cm.clients[c] = struct{}{} cm.clients[c] = struct{}{}

4
internal/clientrtmp/client.go

@ -105,7 +105,7 @@ func New(
runOnConnectRestart bool, runOnConnectRestart bool,
wg *sync.WaitGroup, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
conn *rtmp.Conn, nconn net.Conn,
pathMan PathMan, pathMan PathMan,
parent Parent) *Client { parent Parent) *Client {
@ -118,7 +118,7 @@ func New(
runOnConnectRestart: runOnConnectRestart, runOnConnectRestart: runOnConnectRestart,
wg: wg, wg: wg,
stats: stats, stats: stats,
conn: conn, conn: rtmp.NewServerConn(nconn),
pathMan: pathMan, pathMan: pathMan,
parent: parent, parent: parent,
terminate: make(chan struct{}), terminate: make(chan struct{}),

11
internal/serverrtmp/server.go

@ -6,7 +6,6 @@ import (
"sync/atomic" "sync/atomic"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmp"
) )
// Parent is implemented by program. // Parent is implemented by program.
@ -22,7 +21,7 @@ type Server struct {
closed uint32 closed uint32
// out // out
accept chan *rtmp.Conn accept chan net.Conn
done chan struct{} done chan struct{}
} }
@ -41,7 +40,7 @@ func New(
s := &Server{ s := &Server{
parent: parent, parent: parent,
l: l, l: l,
accept: make(chan *rtmp.Conn), accept: make(chan net.Conn),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -60,7 +59,7 @@ func (s *Server) log(level logger.Level, format string, args ...interface{}) {
func (s *Server) Close() { func (s *Server) Close() {
go func() { go func() {
for co := range s.accept { for co := range s.accept {
co.NetConn().Close() co.Close()
} }
}() }()
atomic.StoreUint32(&s.closed, 1) atomic.StoreUint32(&s.closed, 1)
@ -81,13 +80,13 @@ func (s *Server) run() {
continue continue
} }
s.accept <- rtmp.NewServerConn(nconn) s.accept <- nconn
} }
close(s.accept) close(s.accept)
} }
// Accept returns a channel to accept incoming connections. // Accept returns a channel to accept incoming connections.
func (s *Server) Accept() chan *rtmp.Conn { func (s *Server) Accept() chan net.Conn {
return s.accept return s.accept
} }

Loading…
Cancel
Save