Browse Source

update gortsplib

pull/169/head
aler9 5 years ago
parent
commit
f63647ab1e
  1. 2
      go.mod
  2. 4
      go.sum
  3. 54
      internal/client/client.go
  4. 6
      internal/serverudp/server.go
  5. 20
      internal/sourcertmp/source.go
  6. 16
      internal/sourcertsp/source.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20201115164941-3f45d21f1132 github.com/aler9/gortsplib v0.0.0-20201115231025-c24d3f677b43
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20201115164941-3f45d21f1132 h1:yPtvZE/tfSPkQn7R2TkBv0I9t318yKDgCwynHzuCK5E= github.com/aler9/gortsplib v0.0.0-20201115231025-c24d3f677b43 h1:Y+Cl50nLvue3T56uRB1wnq5yLbHpxsjwTjfBZMfA3Qs=
github.com/aler9/gortsplib v0.0.0-20201115164941-3f45d21f1132/go.mod h1:6yKsTNIrCapRz90WHQtyFV/rKK0TT+QapxUXNqSJi9M= github.com/aler9/gortsplib v0.0.0-20201115231025-c24d3f677b43/go.mod h1:6yKsTNIrCapRz90WHQtyFV/rKK0TT+QapxUXNqSJi9M=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

54
internal/client/client.go

@ -884,25 +884,25 @@ func (c *Client) handleRequest(req *base.Request) error {
} }
func (c *Client) runInitial() bool { func (c *Client) runInitial() bool {
readDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
req, err := c.conn.ReadRequest() req, err := c.conn.ReadRequest()
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
err = c.handleRequest(req) err = c.handleRequest(req)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
} }
}() }()
select { select {
case err := <-readDone: case err := <-readerDone:
switch err { switch err {
case errStateWaitingDescribe: case errStateWaitingDescribe:
return c.runWaitingDescribe() return c.runWaitingDescribe()
@ -926,7 +926,7 @@ func (c *Client) runInitial() bool {
case <-c.terminate: case <-c.terminate:
c.conn.Close() c.conn.Close()
<-readDone <-readerDone
return false return false
} }
} }
@ -1024,25 +1024,25 @@ func (c *Client) runPlay() bool {
} }
func (c *Client) runPlayUDP() bool { func (c *Client) runPlayUDP() bool {
readDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
req, err := c.conn.ReadRequest() req, err := c.conn.ReadRequest()
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
err = c.handleRequest(req) err = c.handleRequest(req)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
} }
}() }()
select { select {
case err := <-readDone: case err := <-readerDone:
if err == errStateInitial { if err == errStateInitial {
c.state = statePrePlay c.state = statePrePlay
c.path.OnClientPause(c) c.path.OnClientPause(c)
@ -1067,7 +1067,7 @@ func (c *Client) runPlayUDP() bool {
c.path = nil c.path = nil
c.conn.Close() c.conn.Close()
<-readDone <-readerDone
return false return false
} }
} }
@ -1076,12 +1076,12 @@ func (c *Client) runPlayTCP() bool {
readRequest := make(chan readRequestPair) readRequest := make(chan readRequestPair)
defer close(readRequest) defer close(readRequest)
readDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
recv, err := c.conn.ReadFrameTCPOrRequest(false) recv, err := c.conn.ReadFrameTCPOrRequest(false)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
@ -1094,7 +1094,7 @@ func (c *Client) runPlayTCP() bool {
readRequest <- readRequestPair{recvt, res} readRequest <- readRequestPair{recvt, res}
err := <-res err := <-res
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
} }
@ -1107,7 +1107,7 @@ func (c *Client) runPlayTCP() bool {
case req := <-readRequest: case req := <-readRequest:
req.res <- c.handleRequest(req.req) req.res <- c.handleRequest(req.req)
case err := <-readDone: case err := <-readerDone:
if err == errStateInitial { if err == errStateInitial {
ch := c.tcpFrame ch := c.tcpFrame
go func() { go func() {
@ -1165,7 +1165,7 @@ func (c *Client) runPlayTCP() bool {
close(c.tcpFrame) close(c.tcpFrame)
c.conn.Close() c.conn.Close()
<-readDone <-readerDone
return false return false
} }
} }
@ -1242,18 +1242,18 @@ func (c *Client) runRecord() bool {
} }
func (c *Client) runRecordUDP() bool { func (c *Client) runRecordUDP() bool {
readDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
req, err := c.conn.ReadRequest() req, err := c.conn.ReadRequest()
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
err = c.handleRequest(req) err = c.handleRequest(req)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
} }
@ -1267,7 +1267,7 @@ func (c *Client) runRecordUDP() bool {
for { for {
select { select {
case err := <-readDone: case err := <-readerDone:
if err == errStateInitial { if err == errStateInitial {
for _, track := range c.streamTracks { for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
@ -1312,7 +1312,7 @@ func (c *Client) runRecordUDP() bool {
c.log("ERR: no packets received recently (maybe there's a firewall/NAT in between)") c.log("ERR: no packets received recently (maybe there's a firewall/NAT in between)")
c.conn.Close() c.conn.Close()
<-readDone <-readerDone
c.path.OnClientRemove(c) c.path.OnClientRemove(c)
c.path = nil c.path = nil
@ -1341,7 +1341,7 @@ func (c *Client) runRecordUDP() bool {
} }
c.conn.Close() c.conn.Close()
<-readDone <-readerDone
c.path.OnClientRemove(c) c.path.OnClientRemove(c)
c.path = nil c.path = nil
@ -1355,19 +1355,19 @@ func (c *Client) runRecordTCP() bool {
readRequest := make(chan readRequestPair) readRequest := make(chan readRequestPair)
defer close(readRequest) defer close(readRequest)
readDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
recv, err := c.conn.ReadFrameTCPOrRequest(true) recv, err := c.conn.ReadFrameTCPOrRequest(true)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
switch recvt := recv.(type) { switch recvt := recv.(type) {
case *base.InterleavedFrame: case *base.InterleavedFrame:
if recvt.TrackId >= len(c.streamTracks) { if recvt.TrackId >= len(c.streamTracks) {
readDone <- fmt.Errorf("invalid track id '%d'", recvt.TrackId) readerDone <- fmt.Errorf("invalid track id '%d'", recvt.TrackId)
return return
} }
@ -1377,7 +1377,7 @@ func (c *Client) runRecordTCP() bool {
case *base.Request: case *base.Request:
err := c.handleRequest(recvt) err := c.handleRequest(recvt)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
} }
@ -1393,7 +1393,7 @@ func (c *Client) runRecordTCP() bool {
case req := <-readRequest: case req := <-readRequest:
req.res <- c.handleRequest(req.req) req.res <- c.handleRequest(req.req)
case err := <-readDone: case err := <-readerDone:
if err == errStateInitial { if err == errStateInitial {
c.state = statePreRecord c.state = statePreRecord
c.path.OnClientPause(c) c.path.OnClientPause(c)
@ -1429,7 +1429,7 @@ func (c *Client) runRecordTCP() bool {
}() }()
c.conn.Close() c.conn.Close()
<-readDone <-readerDone
c.path.OnClientRemove(c) c.path.OnClientRemove(c)
c.path = nil c.path = nil

6
internal/serverudp/server.go

@ -111,9 +111,9 @@ func (s *Server) Close() {
func (s *Server) run() { func (s *Server) run() {
defer close(s.done) defer close(s.done)
writeDone := make(chan struct{}) writerDone := make(chan struct{})
go func() { go func() {
defer close(writeDone) defer close(writerDone)
for w := range s.write { for w := range s.write {
s.pc.SetWriteDeadline(time.Now().Add(s.writeTimeout)) s.pc.SetWriteDeadline(time.Now().Add(s.writeTimeout))
s.pc.WriteTo(w.buf, w.addr) s.pc.WriteTo(w.buf, w.addr)
@ -144,7 +144,7 @@ func (s *Server) run() {
} }
close(s.write) close(s.write)
<-writeDone <-writerDone
} }
// Port returns the server local port. // Port returns the server local port.

20
internal/sourcertmp/source.go

@ -229,33 +229,33 @@ func (s *Source) runInner() bool {
s.parent.Log("rtmp source ready") s.parent.Log("rtmp source ready")
s.parent.OnSourceSetReady(tracks) s.parent.OnSourceSetReady(tracks)
readDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
for { for {
pkt, err := conn.ReadPacket() pkt, err := conn.ReadPacket()
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
switch pkt.Type { switch pkt.Type {
case av.H264: case av.H264:
if h264Sps == nil { if h264Sps == nil {
readDone <- fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up") readerDone <- fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up")
return return
} }
// decode from AVCC format // decode from AVCC format
nalus, typ := h264.SplitNALUs(pkt.Data) nalus, typ := h264.SplitNALUs(pkt.Data)
if typ != h264.NALU_AVCC { if typ != h264.NALU_AVCC {
readDone <- fmt.Errorf("invalid NALU format (%d)", typ) readerDone <- fmt.Errorf("invalid NALU format (%d)", typ)
return return
} }
// encode into RTP/H264 format // encode into RTP/H264 format
frames, err := h264Encoder.Write(nalus, pkt.Time+pkt.CTime) frames, err := h264Encoder.Write(nalus, pkt.Time+pkt.CTime)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
@ -265,13 +265,13 @@ func (s *Source) runInner() bool {
case av.AAC: case av.AAC:
if aacConfig == nil { if aacConfig == nil {
readDone <- fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up") readerDone <- fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up")
return return
} }
frames, err := aacEncoder.Write(pkt.Data, pkt.Time+pkt.CTime) frames, err := aacEncoder.Write(pkt.Data, pkt.Time+pkt.CTime)
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
@ -280,7 +280,7 @@ func (s *Source) runInner() bool {
} }
default: default:
readDone <- fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type) readerDone <- fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type)
return return
} }
} }
@ -293,11 +293,11 @@ outer:
select { select {
case <-s.terminate: case <-s.terminate:
nconn.Close() nconn.Close()
<-readDone <-readerDone
ret = false ret = false
break outer break outer
case err := <-readDone: case err := <-readerDone:
nconn.Close() nconn.Close()
s.parent.Log("rtmp source ERR: %s", err) s.parent.Log("rtmp source ERR: %s", err)
ret = true ret = true

16
internal/sourcertsp/source.go

@ -139,18 +139,16 @@ func (s *Source) runInner() bool {
s.parent.Log("rtsp source ready") s.parent.Log("rtsp source ready")
s.parent.OnSourceSetReady(tracks) s.parent.OnSourceSetReady(tracks)
readDone := make(chan error) readerDone := make(chan error)
go func() {
for { conn.OnFrame(func(trackId int, streamType gortsplib.StreamType, content []byte, err error) {
trackId, streamType, content, err := conn.ReadFrame()
if err != nil { if err != nil {
readDone <- err readerDone <- err
return return
} }
s.parent.OnFrame(trackId, streamType, content) s.parent.OnFrame(trackId, streamType, content)
} })
}()
var ret bool var ret bool
@ -159,11 +157,11 @@ outer:
select { select {
case <-s.terminate: case <-s.terminate:
conn.Close() conn.Close()
<-readDone <-readerDone
ret = false ret = false
break outer break outer
case err := <-readDone: case err := <-readerDone:
conn.Close() conn.Close()
s.parent.Log("rtsp source ERR: %s", err) s.parent.Log("rtsp source ERR: %s", err)
ret = true ret = true

Loading…
Cancel
Save