Browse Source

rename variables

pull/31/head
aler9 5 years ago
parent
commit
1931e53992
  1. 18
      main.go
  2. 82
      server-client.go
  3. 12
      server-tcpl.go
  4. 8
      server-udpl.go

18
main.go

@ -59,9 +59,9 @@ type args struct { @@ -59,9 +59,9 @@ type args struct {
type program struct {
args args
protocols map[streamProtocol]struct{}
rtspl *serverTcpListener
rtpl *serverUdpListener
rtcpl *serverUdpListener
tcpl *serverTcpListener
udplRtp *serverUdpListener
udplRtcp *serverUdpListener
}
func newProgram(args args) (*program, error) {
@ -132,24 +132,24 @@ func newProgram(args args) (*program, error) { @@ -132,24 +132,24 @@ func newProgram(args args) (*program, error) {
var err error
p.rtpl, err = newServerUdpListener(p, args.rtpPort, _TRACK_FLOW_RTP)
p.udplRtp, err = newServerUdpListener(p, args.rtpPort, _TRACK_FLOW_RTP)
if err != nil {
return nil, err
}
p.rtcpl, err = newServerUdpListener(p, args.rtcpPort, _TRACK_FLOW_RTCP)
p.udplRtcp, err = newServerUdpListener(p, args.rtcpPort, _TRACK_FLOW_RTCP)
if err != nil {
return nil, err
}
p.rtspl, err = newServerTcpListener(p)
p.tcpl, err = newServerTcpListener(p)
if err != nil {
return nil, err
}
go p.rtpl.run()
go p.rtcpl.run()
go p.rtspl.run()
go p.udplRtp.run()
go p.udplRtcp.run()
go p.tcpl.run()
return p, nil
}

82
server-client.go

@ -128,30 +128,30 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { @@ -128,30 +128,30 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
write: make(chan *gortsplib.InterleavedFrame),
}
c.p.rtspl.mutex.Lock()
c.p.rtspl.clients[c] = struct{}{}
c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
c.p.tcpl.clients[c] = struct{}{}
c.p.tcpl.mutex.Unlock()
return c
}
func (c *serverClient) close() error {
// already deleted
if _, ok := c.p.rtspl.clients[c]; !ok {
if _, ok := c.p.tcpl.clients[c]; !ok {
return nil
}
delete(c.p.rtspl.clients, c)
delete(c.p.tcpl.clients, c)
c.conn.NetConn().Close()
close(c.write)
if c.path != "" {
if pub, ok := c.p.rtspl.publishers[c.path]; ok && pub == c {
delete(c.p.rtspl.publishers, c.path)
if pub, ok := c.p.tcpl.publishers[c.path]; ok && pub == c {
delete(c.p.tcpl.publishers, c.path)
// if the publisher has disconnected
// close all other connections that share the same path
for oc := range c.p.rtspl.clients {
for oc := range c.p.tcpl.clients {
if oc.path == c.path {
oc.close()
}
@ -189,8 +189,8 @@ func (c *serverClient) run() { @@ -189,8 +189,8 @@ func (c *serverClient) run() {
defer c.log("disconnected")
defer func() {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
c.close()
}()
@ -288,10 +288,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -288,10 +288,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
sdp, err := func() ([]byte, error) {
c.p.rtspl.mutex.RLock()
defer c.p.rtspl.mutex.RUnlock()
c.p.tcpl.mutex.RLock()
defer c.p.tcpl.mutex.RUnlock()
pub, ok := c.p.rtspl.publishers[path]
pub, ok := c.p.tcpl.publishers[path]
if !ok {
return nil, fmt.Errorf("no one is streaming on path '%s'", path)
}
@ -369,16 +369,16 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -369,16 +369,16 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
sdpParsed, req.Content = sdpFilter(sdpParsed, req.Content)
err = func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
_, ok := c.p.rtspl.publishers[path]
_, ok := c.p.tcpl.publishers[path]
if ok {
return fmt.Errorf("another client is already publishing on path '%s'", path)
}
c.path = path
c.p.rtspl.publishers[path] = c
c.p.tcpl.publishers[path] = c
c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed
c.state = _CLIENT_STATE_ANNOUNCE
@ -443,10 +443,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -443,10 +443,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
err := func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
pub, ok := c.p.rtspl.publishers[path]
pub, ok := c.p.tcpl.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}
@ -502,10 +502,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -502,10 +502,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
err := func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
pub, ok := c.p.rtspl.publishers[path]
pub, ok := c.p.tcpl.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}
@ -590,8 +590,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -590,8 +590,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
err := func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP {
return fmt.Errorf("client wants to publish tracks with different protocols")
@ -639,8 +639,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -639,8 +639,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
var interleaved string
err := func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP {
return fmt.Errorf("client wants to publish tracks with different protocols")
@ -710,10 +710,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -710,10 +710,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
err := func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
pub, ok := c.p.rtspl.publishers[c.path]
pub, ok := c.p.tcpl.publishers[c.path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", c.path)
}
@ -747,9 +747,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -747,9 +747,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return "tracks"
}(), c.streamProtocol)
c.p.rtspl.mutex.Lock()
c.p.tcpl.mutex.Lock()
c.state = _CLIENT_STATE_PLAY
c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Unlock()
// when protocol is TCP, the RTSP connection becomes a RTP connection
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
@ -788,9 +788,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -788,9 +788,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
c.log("paused")
c.p.rtspl.mutex.Lock()
c.p.tcpl.mutex.Lock()
c.state = _CLIENT_STATE_PRE_PLAY
c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Unlock()
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
@ -813,8 +813,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -813,8 +813,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
err := func() error {
c.p.rtspl.mutex.Lock()
defer c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
if len(c.streamTracks) != len(c.streamSdpParsed.Medias) {
return fmt.Errorf("not all tracks have been setup")
@ -835,9 +835,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -835,9 +835,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
},
})
c.p.rtspl.mutex.Lock()
c.p.tcpl.mutex.Lock()
c.state = _CLIENT_STATE_RECORD
c.p.rtspl.mutex.Unlock()
c.p.tcpl.mutex.Unlock()
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
@ -863,9 +863,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -863,9 +863,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
c.p.rtspl.mutex.RLock()
c.p.rtspl.forwardTrack(c.path, trackId, trackFlow, frame.Content)
c.p.rtspl.mutex.RUnlock()
c.p.tcpl.mutex.RLock()
c.p.tcpl.forwardTrack(c.path, trackId, trackFlow, frame.Content)
c.p.tcpl.mutex.RUnlock()
}
}

12
server-tcpl.go

@ -10,14 +10,14 @@ import ( @@ -10,14 +10,14 @@ import (
type serverTcpListener struct {
p *program
netl *net.TCPListener
nconn *net.TCPListener
mutex sync.RWMutex
clients map[*serverClient]struct{}
publishers map[string]*serverClient
}
func newServerTcpListener(p *program) (*serverTcpListener, error) {
netl, err := net.ListenTCP("tcp", &net.TCPAddr{
nconn, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: p.args.rtspPort,
})
if err != nil {
@ -26,7 +26,7 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) { @@ -26,7 +26,7 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) {
s := &serverTcpListener{
p: p,
netl: netl,
nconn: nconn,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]*serverClient),
}
@ -41,7 +41,7 @@ func (l *serverTcpListener) log(format string, args ...interface{}) { @@ -41,7 +41,7 @@ func (l *serverTcpListener) log(format string, args ...interface{}) {
func (l *serverTcpListener) run() {
for {
nconn, err := l.netl.AcceptTCP()
nconn, err := l.nconn.AcceptTCP()
if err != nil {
break
}
@ -56,7 +56,7 @@ func (l *serverTcpListener) forwardTrack(path string, id int, flow trackFlow, fr @@ -56,7 +56,7 @@ func (l *serverTcpListener) forwardTrack(path string, id int, flow trackFlow, fr
if c.path == path && c.state == _CLIENT_STATE_PLAY {
if c.streamProtocol == _STREAM_PROTOCOL_UDP {
if flow == _TRACK_FLOW_RTP {
l.p.rtpl.write <- &udpWrite{
l.p.udplRtp.write <- &udpWrite{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
@ -65,7 +65,7 @@ func (l *serverTcpListener) forwardTrack(path string, id int, flow trackFlow, fr @@ -65,7 +65,7 @@ func (l *serverTcpListener) forwardTrack(path string, id int, flow trackFlow, fr
buf: frame,
}
} else {
l.p.rtcpl.write <- &udpWrite{
l.p.udplRtcp.write <- &udpWrite{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),

8
server-udpl.go

@ -68,12 +68,12 @@ func (l *serverUdpListener) run() { @@ -68,12 +68,12 @@ func (l *serverUdpListener) run() {
}
func() {
l.p.rtspl.mutex.RLock()
defer l.p.rtspl.mutex.RUnlock()
l.p.tcpl.mutex.RLock()
defer l.p.tcpl.mutex.RUnlock()
// find path and track id from ip and port
path, trackId := func() (string, int) {
for _, pub := range l.p.rtspl.publishers {
for _, pub := range l.p.tcpl.publishers {
for i, t := range pub.streamTracks {
if !pub.ip().Equal(addr.IP) {
continue
@ -96,7 +96,7 @@ func (l *serverUdpListener) run() { @@ -96,7 +96,7 @@ func (l *serverUdpListener) run() {
return
}
l.p.rtspl.forwardTrack(path, trackId, l.flow, buf[:n])
l.p.tcpl.forwardTrack(path, trackId, l.flow, buf[:n])
}()
}

Loading…
Cancel
Save