Browse Source

fix panic when a client disconnects

pull/2/head v0.1.2
aler9 6 years ago
parent
commit
9a6a813640
  1. 16
      main.go
  2. 16
      rtsp_client.go
  3. 8
      rtsp_listener.go
  4. 5
      udp_listener.go

16
main.go

@ -55,18 +55,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {
} }
func (p *program) run() { func (p *program) run() {
var wg sync.WaitGroup go p.rtpl.run()
go p.rtcpl.run()
go p.rtspl.run()
wg.Add(1) infty := make(chan struct{})
go p.rtpl.run(wg) <-infty
wg.Add(1)
go p.rtcpl.run(wg)
wg.Add(1)
go p.rtspl.run(wg)
wg.Wait()
} }
func (p *program) handleRtp(buf []byte) { func (p *program) handleRtp(buf []byte) {

16
rtsp_client.go

@ -10,7 +10,6 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"rtsp-server/rtsp" "rtsp-server/rtsp"
) )
@ -40,20 +39,26 @@ func newRtspClient(p *program, nconn net.Conn) *rtspClient {
} }
func (c *rtspClient) close() error { func (c *rtspClient) close() error {
// already deleted
if _, ok := c.p.clients[c]; !ok {
return nil
}
delete(c.p.clients, c) delete(c.p.clients, c)
c.nconn.Close()
if c.p.streamAuthor == c { if c.p.streamAuthor == c {
c.p.streamAuthor = nil c.p.streamAuthor = nil
c.p.streamSdp = nil c.p.streamSdp = nil
// if the streamer has disconnected // if the publisher has disconnected
// close all other connections // close all other connections
for oc := range c.p.clients { for oc := range c.p.clients {
oc.close() oc.close()
} }
} }
return c.nconn.Close() return nil
} }
func (c *rtspClient) log(format string, args ...interface{}) { func (c *rtspClient) log(format string, args ...interface{}) {
@ -61,8 +66,7 @@ func (c *rtspClient) log(format string, args ...interface{}) {
log.Printf(format, args...) log.Printf(format, args...)
} }
func (c *rtspClient) run(wg sync.WaitGroup) { func (c *rtspClient) run() {
defer wg.Done()
defer c.log("disconnected") defer c.log("disconnected")
defer func() { defer func() {
c.p.mutex.Lock() c.p.mutex.Lock()
@ -278,7 +282,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
// use two fake server ports, since we do not want to receive feedback // use two fake server ports, since we do not want to receive feedback
// from the client // from the client
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort + 2, c.p.rtcpPort + 2), fmt.Sprintf("server_port=%d-%d", c.p.rtpPort+2, c.p.rtcpPort+2),
"ssrc=1234ABCD", "ssrc=1234ABCD",
}, ";"), }, ";"),
"Session": "12345678", "Session": "12345678",

8
rtsp_listener.go

@ -3,7 +3,6 @@ package main
import ( import (
"log" "log"
"net" "net"
"sync"
) )
type rtspListener struct { type rtspListener struct {
@ -32,9 +31,7 @@ func (l *rtspListener) log(format string, args ...interface{}) {
log.Printf("[RTSP listener] "+format, args...) log.Printf("[RTSP listener] "+format, args...)
} }
func (l *rtspListener) run(wg sync.WaitGroup) { func (l *rtspListener) run() {
defer wg.Done()
for { for {
nconn, err := l.netl.AcceptTCP() nconn, err := l.netl.AcceptTCP()
if err != nil { if err != nil {
@ -42,7 +39,6 @@ func (l *rtspListener) run(wg sync.WaitGroup) {
} }
rsc := newRtspClient(l.p, nconn) rsc := newRtspClient(l.p, nconn)
wg.Add(1) go rsc.run()
go rsc.run(wg)
} }
} }

5
udp_listener.go

@ -3,7 +3,6 @@ package main
import ( import (
"log" "log"
"net" "net"
"sync"
) )
type udpListener struct { type udpListener struct {
@ -34,9 +33,7 @@ func (l *udpListener) log(format string, args ...interface{}) {
log.Printf("["+l.logPrefix+" listener] "+format, args...) log.Printf("["+l.logPrefix+" listener] "+format, args...)
} }
func (l *udpListener) run(wg sync.WaitGroup) { func (l *udpListener) run() {
defer wg.Done()
buf := make([]byte, 2048) // UDP MTU is 1400 buf := make([]byte, 2048) // UDP MTU is 1400
for { for {

Loading…
Cancel
Save