Browse Source

move clients and publishers below serverTcpListener

pull/31/head
aler9 5 years ago
parent
commit
e072bdf0b4
  1. 22
      main.go
  2. 80
      server-client.go
  3. 14
      server-tcpl.go
  4. 6
      server-udpl.go

22
main.go

@ -7,7 +7,6 @@ import (
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
@ -60,14 +59,11 @@ type args struct {
} }
type program struct { type program struct {
args args args args
protocols map[streamProtocol]struct{} protocols map[streamProtocol]struct{}
mutex sync.RWMutex rtspl *serverTcpListener
rtspl *serverTcpListener rtpl *serverUdpListener
rtpl *serverUdpListener rtcpl *serverUdpListener
rtcpl *serverUdpListener
clients map[*serverClient]struct{}
publishers map[string]*serverClient
} }
func newProgram(args args) (*program, error) { func newProgram(args args) (*program, error) {
@ -132,10 +128,8 @@ func newProgram(args args) (*program, error) {
log.Printf("rtsp-simple-server %s", Version) log.Printf("rtsp-simple-server %s", Version)
p := &program{ p := &program{
args: args, args: args,
protocols: protocols, protocols: protocols,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]*serverClient),
} }
var err error var err error
@ -163,7 +157,7 @@ func newProgram(args args) (*program, error) {
} }
func (p *program) forwardTrack(path string, id int, flow trackFlow, frame []byte) { func (p *program) forwardTrack(path string, id int, flow trackFlow, frame []byte) {
for c := range p.clients { for c := range p.rtspl.clients {
if c.path == path && c.state == _CLIENT_STATE_PLAY { if c.path == path && c.state == _CLIENT_STATE_PLAY {
if c.streamProtocol == _STREAM_PROTOCOL_UDP { if c.streamProtocol == _STREAM_PROTOCOL_UDP {
if flow == _TRACK_FLOW_RTP { if flow == _TRACK_FLOW_RTP {

80
server-client.go

@ -128,30 +128,30 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
write: make(chan *gortsplib.InterleavedFrame), write: make(chan *gortsplib.InterleavedFrame),
} }
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
c.p.clients[c] = struct{}{} c.p.rtspl.clients[c] = struct{}{}
c.p.mutex.Unlock() c.p.rtspl.mutex.Unlock()
return c return c
} }
func (c *serverClient) close() error { func (c *serverClient) close() error {
// already deleted // already deleted
if _, ok := c.p.clients[c]; !ok { if _, ok := c.p.rtspl.clients[c]; !ok {
return nil return nil
} }
delete(c.p.clients, c) delete(c.p.rtspl.clients, c)
c.conn.NetConn().Close() c.conn.NetConn().Close()
close(c.write) close(c.write)
if c.path != "" { if c.path != "" {
if pub, ok := c.p.publishers[c.path]; ok && pub == c { if pub, ok := c.p.rtspl.publishers[c.path]; ok && pub == c {
delete(c.p.publishers, c.path) delete(c.p.rtspl.publishers, c.path)
// if the publisher has disconnected // if the publisher has disconnected
// close all other connections that share the same path // close all other connections that share the same path
for oc := range c.p.clients { for oc := range c.p.rtspl.clients {
if oc.path == c.path { if oc.path == c.path {
oc.close() oc.close()
} }
@ -189,8 +189,8 @@ func (c *serverClient) run() {
defer c.log("disconnected") defer c.log("disconnected")
defer func() { defer func() {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
c.close() c.close()
}() }()
@ -288,10 +288,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
sdp, err := func() ([]byte, error) { sdp, err := func() ([]byte, error) {
c.p.mutex.RLock() c.p.rtspl.mutex.RLock()
defer c.p.mutex.RUnlock() defer c.p.rtspl.mutex.RUnlock()
pub, ok := c.p.publishers[path] pub, ok := c.p.rtspl.publishers[path]
if !ok { if !ok {
return nil, fmt.Errorf("no one is streaming on path '%s'", path) return nil, fmt.Errorf("no one is streaming on path '%s'", path)
} }
@ -369,16 +369,16 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
sdpParsed, req.Content = sdpFilter(sdpParsed, req.Content) sdpParsed, req.Content = sdpFilter(sdpParsed, req.Content)
err = func() error { err = func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
_, ok := c.p.publishers[path] _, ok := c.p.rtspl.publishers[path]
if ok { if ok {
return fmt.Errorf("another client is already publishing on path '%s'", path) return fmt.Errorf("another client is already publishing on path '%s'", path)
} }
c.path = path c.path = path
c.p.publishers[path] = c c.p.rtspl.publishers[path] = c
c.streamSdpText = req.Content c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed c.streamSdpParsed = sdpParsed
c.state = _CLIENT_STATE_ANNOUNCE c.state = _CLIENT_STATE_ANNOUNCE
@ -443,10 +443,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
pub, ok := c.p.publishers[path] pub, ok := c.p.rtspl.publishers[path]
if !ok { if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path) return fmt.Errorf("no one is streaming on path '%s'", path)
} }
@ -502,10 +502,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
pub, ok := c.p.publishers[path] pub, ok := c.p.rtspl.publishers[path]
if !ok { if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path) return fmt.Errorf("no one is streaming on path '%s'", path)
} }
@ -590,8 +590,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP {
return fmt.Errorf("client wants to publish tracks with different protocols") return fmt.Errorf("client wants to publish tracks with different protocols")
@ -639,8 +639,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
var interleaved string var interleaved string
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP {
return fmt.Errorf("client wants to publish tracks with different protocols") return fmt.Errorf("client wants to publish tracks with different protocols")
@ -710,10 +710,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
pub, ok := c.p.publishers[c.path] pub, ok := c.p.rtspl.publishers[c.path]
if !ok { if !ok {
return fmt.Errorf("no one is streaming on path '%s'", c.path) return fmt.Errorf("no one is streaming on path '%s'", c.path)
} }
@ -747,9 +747,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return "tracks" return "tracks"
}(), c.streamProtocol) }(), c.streamProtocol)
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
c.state = _CLIENT_STATE_PLAY c.state = _CLIENT_STATE_PLAY
c.p.mutex.Unlock() c.p.rtspl.mutex.Unlock()
// when protocol is TCP, the RTSP connection becomes a RTP connection // when protocol is TCP, the RTSP connection becomes a RTP connection
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == _STREAM_PROTOCOL_TCP {
@ -788,9 +788,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
c.log("paused") c.log("paused")
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
c.state = _CLIENT_STATE_PRE_PLAY c.state = _CLIENT_STATE_PRE_PLAY
c.p.mutex.Unlock() c.p.rtspl.mutex.Unlock()
c.conn.WriteResponse(&gortsplib.Response{ c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK, StatusCode: gortsplib.StatusOK,
@ -813,8 +813,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.rtspl.mutex.Unlock()
if len(c.streamTracks) != len(c.streamSdpParsed.Medias) { if len(c.streamTracks) != len(c.streamSdpParsed.Medias) {
return fmt.Errorf("not all tracks have been setup") return fmt.Errorf("not all tracks have been setup")
@ -835,9 +835,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}, },
}) })
c.p.mutex.Lock() c.p.rtspl.mutex.Lock()
c.state = _CLIENT_STATE_RECORD c.state = _CLIENT_STATE_RECORD
c.p.mutex.Unlock() c.p.rtspl.mutex.Unlock()
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string { c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 { if len(c.streamTracks) == 1 {
@ -863,9 +863,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false return false
} }
c.p.mutex.RLock() c.p.rtspl.mutex.RLock()
c.p.forwardTrack(c.path, trackId, trackFlow, frame.Content) c.p.forwardTrack(c.path, trackId, trackFlow, frame.Content)
c.p.mutex.RUnlock() c.p.rtspl.mutex.RUnlock()
} }
} }

14
server-tcpl.go

@ -3,11 +3,15 @@ package main
import ( import (
"log" "log"
"net" "net"
"sync"
) )
type serverTcpListener struct { type serverTcpListener struct {
p *program p *program
netl *net.TCPListener netl *net.TCPListener
mutex sync.RWMutex
clients map[*serverClient]struct{}
publishers map[string]*serverClient
} }
func newServerTcpListener(p *program) (*serverTcpListener, error) { func newServerTcpListener(p *program) (*serverTcpListener, error) {
@ -19,8 +23,10 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) {
} }
s := &serverTcpListener{ s := &serverTcpListener{
p: p, p: p,
netl: netl, netl: netl,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]*serverClient),
} }
s.log("opened on :%d", p.args.rtspPort) s.log("opened on :%d", p.args.rtspPort)

6
server-udpl.go

@ -68,12 +68,12 @@ func (l *serverUdpListener) run() {
} }
func() { func() {
l.p.mutex.RLock() l.p.rtspl.mutex.RLock()
defer l.p.mutex.RUnlock() defer l.p.rtspl.mutex.RUnlock()
// find path and track id from ip and port // find path and track id from ip and port
path, trackId := func() (string, int) { path, trackId := func() (string, int) {
for _, pub := range l.p.publishers { for _, pub := range l.p.rtspl.publishers {
for i, t := range pub.streamTracks { for i, t := range pub.streamTracks {
if !pub.ip().Equal(addr.IP) { if !pub.ip().Equal(addr.IP) {
continue continue

Loading…
Cancel
Save