Browse Source

send TEARDOWN to source when its connection closes (#63)

pull/80/head
aler9 5 years ago
parent
commit
2056689560
  1. 24
      client.go
  2. 2
      go.mod
  3. 4
      go.sum
  4. 66
      source.go

24
client.go

@ -887,14 +887,30 @@ func (c *client) runPlay(path string) { @@ -887,14 +887,30 @@ func (c *client) runPlay(path string) {
} else {
readDone := make(chan error)
go func() {
buf := make([]byte, 2048)
frame := &gortsplib.InterleavedFrame{}
readBuf := make([]byte, clientTcpReadBufferSize)
for {
_, err := c.conn.NetConn().Read(buf)
frame.Content = readBuf
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame, false)
if err != nil {
readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
// rtcp feedback is handled by gortsplib
case *gortsplib.Request:
ok := c.handleRequest(recvt)
if !ok {
readDone <- nil
break
}
}
}
}()
@ -902,7 +918,7 @@ func (c *client) runPlay(path string) { @@ -902,7 +918,7 @@ func (c *client) runPlay(path string) {
for {
select {
case err := <-readDone:
if err != io.EOF {
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer
@ -1034,7 +1050,7 @@ func (c *client) runRecord(path string) { @@ -1034,7 +1050,7 @@ func (c *client) runRecord(path string) {
frame.Content = readBuf.next()
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame)
recv, err := c.conn.ReadFrameOrRequest(frame, true)
if err != nil {
readDone <- err
break

2
go.mod

@ -5,7 +5,7 @@ go 1.12 @@ -5,7 +5,7 @@ go 1.12
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200817164925-b45d2000a637
github.com/aler9/gortsplib v0.0.0-20200829115535-1b7b4ea7dc77
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.6.1

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200817164925-b45d2000a637 h1:nRL12W8of4IVFqFCJRsF4Iy0NQAhSbNdu4+dBzdu3n8=
github.com/aler9/gortsplib v0.0.0-20200817164925-b45d2000a637/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/gortsplib v0.0.0-20200829115535-1b7b4ea7dc77 h1:brN94N74bmrW9MsURf1QY6Mpph9P9aUUO730BavHshg=
github.com/aler9/gortsplib v0.0.0-20200829115535-1b7b4ea7dc77/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

66
source.go

@ -170,16 +170,16 @@ func (s *source) doInner(terminate chan struct{}) bool { @@ -170,16 +170,16 @@ func (s *source) doInner(terminate chan struct{}) bool {
return true
}
defer conn.Close()
_, err = conn.Options(s.confp.sourceUrl)
if err != nil {
conn.Close()
s.log("ERR: %s", err)
return true
}
tracks, _, err := conn.Describe(s.confp.sourceUrl)
if err != nil {
conn.Close()
s.log("ERR: %s", err)
return true
}
@ -199,24 +199,17 @@ func (s *source) doInner(terminate chan struct{}) bool { @@ -199,24 +199,17 @@ func (s *source) doInner(terminate chan struct{}) bool {
}
func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool {
type trackListenerPair struct {
serverRtp *gortsplib.ConnClientUdpListener
serverRtcp *gortsplib.ConnClientUdpListener
}
var listeners []*trackListenerPair
var rtpReads []gortsplib.UdpReadFunc
var rtcpReads []gortsplib.UdpReadFunc
for _, track := range s.tracks {
var serverRtp *gortsplib.ConnClientUdpListener
var serverRtcp *gortsplib.ConnClientUdpListener
var err error
for {
// choose two consecutive ports in range 65536-10000
// rtp must be even and rtcp odd
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1
serverRtp, serverRtcp, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort)
rtpRead, rtcpRead, _, err := conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort)
if err != nil {
// retry if it's a bind error
if nerr, ok := err.(*net.OpError); ok {
@ -227,21 +220,20 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo @@ -227,21 +220,20 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
}
}
conn.Close()
s.log("ERR: %s", err)
return true
}
rtpReads = append(rtpReads, rtpRead)
rtcpReads = append(rtcpReads, rtcpRead)
break
}
listeners = append(listeners, &trackListenerPair{
serverRtp: serverRtp,
serverRtcp: serverRtcp,
})
}
_, err := conn.Play(s.confp.sourceUrl)
if err != nil {
conn.Close()
s.log("ERR: %s", err)
return true
}
@ -250,42 +242,44 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo @@ -250,42 +242,44 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
var wg sync.WaitGroup
for trackId, lp := range listeners {
wg.Add(2)
// receive RTP packets
go func(trackId int, l *gortsplib.ConnClientUdpListener) {
// receive RTP packets
for trackId, rtpRead := range rtpReads {
wg.Add(1)
go func(trackId int, rtpRead gortsplib.UdpReadFunc) {
defer wg.Done()
multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize)
for {
buf := multiBuf.next()
n, err := l.Read(buf)
n, err := rtpRead(buf)
if err != nil {
break
}
s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]}
}
}(trackId, lp.serverRtp)
}(trackId, rtpRead)
}
// receive RTCP packets
go func(trackId int, l *gortsplib.ConnClientUdpListener) {
// receive RTCP packets
for trackId, rtcpRead := range rtcpReads {
wg.Add(1)
go func(trackId int, rtcpRead gortsplib.UdpReadFunc) {
defer wg.Done()
multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize)
for {
buf := multiBuf.next()
n, err := l.Read(buf)
n, err := rtcpRead(buf)
if err != nil {
break
}
s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]}
}
}(trackId, lp.serverRtcp)
}(trackId, rtcpRead)
}
tcpConnDone := make(chan error)
@ -299,26 +293,23 @@ outer: @@ -299,26 +293,23 @@ outer:
for {
select {
case <-terminate:
conn.NetConn().Close()
conn.Close()
<-tcpConnDone
ret = false
break outer
case err := <-tcpConnDone:
conn.Close()
s.log("ERR: %s", err)
ret = true
break outer
}
}
s.p.events <- programEventSourceNotReady{s}
for _, lp := range listeners {
lp.serverRtp.Close()
lp.serverRtcp.Close()
}
wg.Wait()
s.p.events <- programEventSourceNotReady{s}
return ret
}
@ -326,6 +317,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo @@ -326,6 +317,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
for _, track := range s.tracks {
_, err := conn.SetupTcp(s.confp.sourceUrl, track)
if err != nil {
conn.Close()
s.log("ERR: %s", err)
return true
}
@ -333,6 +325,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo @@ -333,6 +325,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
_, err := conn.Play(s.confp.sourceUrl)
if err != nil {
conn.Close()
s.log("ERR: %s", err)
return true
}
@ -364,12 +357,13 @@ outer: @@ -364,12 +357,13 @@ outer:
for {
select {
case <-terminate:
conn.NetConn().Close()
conn.Close()
<-tcpConnDone
ret = false
break outer
case err := <-tcpConnDone:
conn.Close()
s.log("ERR: %s", err)
ret = true
break outer

Loading…
Cancel
Save