Browse Source

add ticker to check whether udp packets are being received or not while publishing; fix #15

pull/31/head
aler9 5 years ago
parent
commit
60371afc1a
  1. 71
      server-client.go
  2. 29
      server-udpl.go

71
server-client.go

@ -8,11 +8,17 @@ import ( @@ -8,11 +8,17 @@ import (
"net"
"os/exec"
"strings"
"time"
"github.com/aler9/gortsplib"
"gortc.io/sdp"
)
const (
_UDP_CHECK_STREAM_INTERVAL = 5 * time.Second
_UDP_STREAM_DEAD_AFTER = 10 * time.Second
)
func interleavedChannelToTrack(channel uint8) (int, trackFlow) {
if (channel % 2) == 0 {
return int(channel / 2), _TRACK_FLOW_RTP
@ -62,18 +68,20 @@ func (cs clientState) String() string { @@ -62,18 +68,20 @@ func (cs clientState) String() string {
}
type serverClient struct {
p *program
conn *gortsplib.ConnServer
state clientState
path string
publishAuth *gortsplib.AuthServer
readAuth *gortsplib.AuthServer
streamSdpText []byte // filled only if publisher
streamSdpParsed *sdp.Message // filled only if publisher
streamProtocol streamProtocol
streamTracks []*track
write chan *gortsplib.InterleavedFrame
done chan struct{}
p *program
conn *gortsplib.ConnServer
state clientState
path string
publishAuth *gortsplib.AuthServer
readAuth *gortsplib.AuthServer
streamSdpText []byte // filled only if publisher
streamSdpParsed *sdp.Message // filled only if publisher
streamProtocol streamProtocol
streamTracks []*track
udpLastFrameTime time.Time
udpCheckStreamTicker *time.Ticker
write chan *gortsplib.InterleavedFrame
done chan struct{}
}
func newServerClient(p *program, nconn net.Conn) *serverClient {
@ -170,6 +178,10 @@ func (c *serverClient) run() { @@ -170,6 +178,10 @@ func (c *serverClient) run() {
c.close()
}()
if c.udpCheckStreamTicker != nil {
c.udpCheckStreamTicker.Stop()
}
c.log("disconnected")
func() {
@ -262,6 +274,7 @@ func (c *serverClient) validateAuth(req *gortsplib.Request, user string, pass st @@ -262,6 +274,7 @@ func (c *serverClient) validateAuth(req *gortsplib.Request, user string, pass st
return errAuthNotCritical
}
return nil
}()
if err != nil {
@ -871,10 +884,6 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -871,10 +884,6 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
},
})
c.p.tcpl.mutex.Lock()
c.state = _CLIENT_STATE_RECORD
c.p.tcpl.mutex.Unlock()
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
@ -885,6 +894,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -885,6 +894,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
// when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP data and parse it
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
c.p.tcpl.mutex.Lock()
c.state = _CLIENT_STATE_RECORD
c.p.tcpl.mutex.Unlock()
for {
frame, err := c.conn.ReadInterleavedFrame()
if err != nil {
@ -905,6 +918,32 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { @@ -905,6 +918,32 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
c.p.tcpl.forwardTrack(c.path, trackId, trackFlow, frame.Content)
c.p.tcpl.mutex.RUnlock()
}
} else {
c.p.tcpl.mutex.Lock()
c.state = _CLIENT_STATE_RECORD
c.udpLastFrameTime = time.Now()
c.udpCheckStreamTicker = time.NewTicker(_UDP_CHECK_STREAM_INTERVAL)
c.p.tcpl.mutex.Unlock()
go func() {
for range c.udpCheckStreamTicker.C {
ok := func() bool {
c.p.tcpl.mutex.Lock()
defer c.p.tcpl.mutex.Unlock()
if time.Since(c.udpLastFrameTime) >= _UDP_STREAM_DEAD_AFTER {
return false
}
return true
}()
if !ok {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
break
}
}
}()
}
return true

29
server-udpl.go

@ -68,35 +68,38 @@ func (l *serverUdpListener) run() { @@ -68,35 +68,38 @@ func (l *serverUdpListener) run() {
}
func() {
l.p.tcpl.mutex.RLock()
defer l.p.tcpl.mutex.RUnlock()
l.p.tcpl.mutex.Lock()
defer l.p.tcpl.mutex.Unlock()
// find path and track id from ip and port
path, trackId := func() (string, int) {
// find publisher and track id from ip and port
pub, trackId := func() (*serverClient, int) {
for _, pub := range l.p.tcpl.publishers {
for i, t := range pub.streamTracks {
if !pub.ip().Equal(addr.IP) {
continue
}
if pub.streamProtocol != _STREAM_PROTOCOL_UDP ||
pub.state != _CLIENT_STATE_RECORD ||
!pub.ip().Equal(addr.IP) {
continue
}
for i, t := range pub.streamTracks {
if l.flow == _TRACK_FLOW_RTP {
if t.rtpPort == addr.Port {
return pub.path, i
return pub, i
}
} else {
if t.rtcpPort == addr.Port {
return pub.path, i
return pub, i
}
}
}
}
return "", -1
return nil, -1
}()
if path == "" {
if pub == nil {
return
}
l.p.tcpl.forwardTrack(path, trackId, l.flow, buf[:n])
pub.udpLastFrameTime = time.Now()
l.p.tcpl.forwardTrack(pub.path, trackId, l.flow, buf[:n])
}()
}

Loading…
Cancel
Save