Browse Source

update gortsplib

pull/80/head
aler9 5 years ago
parent
commit
3fca02eca3
  1. 78
      client.go
  2. 8
      conf.go
  3. 2
      go.mod
  4. 4
      go.sum
  5. 44
      main.go
  6. 2
      main_test.go
  7. 12
      server-tcp.go
  8. 18
      server-udp.go
  9. 34
      source.go

78
client.go

@ -19,10 +19,10 @@ import ( @@ -19,10 +19,10 @@ import (
const (
clientCheckStreamInterval = 5 * time.Second
clientReceiverReportInterval = 10 * time.Second
clientTcpReadBufferSize = 128 * 1024
clientTcpWriteBufferSize = 128 * 1024
clientUdpReadBufferSize = 2048
clientUdpWriteBufferSize = 128 * 1024
clientTCPReadBufferSize = 128 * 1024
clientTCPWriteBufferSize = 128 * 1024
clientUDPReadBufferSize = 2048
clientUDPWriteBufferSize = 128 * 1024
)
type clientDescribeReq struct {
@ -45,13 +45,13 @@ type clientSetupPlayReq struct { @@ -45,13 +45,13 @@ type clientSetupPlayReq struct {
trackId int
}
type clientFrameUdpReq struct {
type clientFrameUDPReq struct {
addr *net.UDPAddr
streamType gortsplib.StreamType
buf []byte
}
type clientFrameTcpReq struct {
type clientFrameTCPReq struct {
path *path
trackId int
streamType gortsplib.StreamType
@ -69,7 +69,7 @@ type udpClientAddr struct { @@ -69,7 +69,7 @@ type udpClientAddr struct {
port int
}
func makeUdpClientAddr(ip net.IP, port int) udpClientAddr {
func makeUDPClientAddr(ip net.IP, port int) udpClientAddr {
ret := udpClientAddr{
port: port,
}
@ -181,12 +181,12 @@ func (c *client) close() { @@ -181,12 +181,12 @@ func (c *client) close() {
case clientStateRecord:
atomic.AddInt64(&c.p.countPublisher, -1)
if c.streamProtocol == gortsplib.StreamProtocolUdp {
if c.streamProtocol == gortsplib.StreamProtocolUDP {
for _, track := range c.streamTracks {
key := makeUdpClientAddr(c.ip(), track.rtpPort)
key := makeUDPClientAddr(c.ip(), track.rtpPort)
delete(c.p.udpClientsByAddr, key)
key = makeUdpClientAddr(c.ip(), track.rtcpPort)
key = makeUDPClientAddr(c.ip(), track.rtcpPort)
delete(c.p.udpClientsByAddr, key)
}
}
@ -545,13 +545,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -545,13 +545,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
}
// play via UDP
if th.IsUdp() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
if th.IsUDP() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUDP]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return errRunTerminate
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUDP {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return errRunTerminate
}
@ -570,7 +570,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -570,7 +570,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return errRunTerminate
}
c.streamProtocol = gortsplib.StreamProtocolUdp
c.streamProtocol = gortsplib.StreamProtocolUDP
c.streamTracks[trackId] = &clientTrack{
rtpPort: rtpPort,
rtcpPort: rtcpPort,
@ -592,13 +592,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -592,13 +592,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return nil
// play via TCP
} else if th.IsTcp() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
} else if th.IsTCP() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTCP]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return errRunTerminate
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTCP {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return errRunTerminate
}
@ -611,7 +611,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -611,7 +611,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return errRunTerminate
}
c.streamProtocol = gortsplib.StreamProtocolTcp
c.streamProtocol = gortsplib.StreamProtocolTCP
c.streamTracks[trackId] = &clientTrack{
rtpPort: 0,
rtcpPort: 0,
@ -652,13 +652,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -652,13 +652,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
}
// record via UDP
if th.IsUdp() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
if th.IsUDP() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUDP]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return errRunTerminate
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUDP {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return errRunTerminate
}
@ -674,7 +674,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -674,7 +674,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return errRunTerminate
}
c.streamProtocol = gortsplib.StreamProtocolUdp
c.streamProtocol = gortsplib.StreamProtocolUDP
c.streamTracks[len(c.streamTracks)] = &clientTrack{
rtpPort: rtpPort,
rtcpPort: rtcpPort,
@ -696,13 +696,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -696,13 +696,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return nil
// record via TCP
} else if th.IsTcp() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
} else if th.IsTCP() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTCP]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return errRunTerminate
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTCP {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return errRunTerminate
}
@ -724,7 +724,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { @@ -724,7 +724,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return errRunTerminate
}
c.streamProtocol = gortsplib.StreamProtocolTcp
c.streamProtocol = gortsplib.StreamProtocolTCP
c.streamTracks[len(c.streamTracks)] = &clientTrack{
rtpPort: 0,
rtcpPort: 0,
@ -924,10 +924,10 @@ func (c *client) runPlay() bool { @@ -924,10 +924,10 @@ func (c *client) runPlay() bool {
}
}
if c.streamProtocol == gortsplib.StreamProtocolUdp {
c.runPlayUdp()
if c.streamProtocol == gortsplib.StreamProtocolUDP {
c.runPlayUDP()
} else {
c.runPlayTcp()
c.runPlayTCP()
}
if onReadCmd != nil {
@ -938,7 +938,7 @@ func (c *client) runPlay() bool { @@ -938,7 +938,7 @@ func (c *client) runPlay() bool {
return false
}
func (c *client) runPlayUdp() {
func (c *client) runPlayUDP() {
readDone := make(chan error)
go func() {
for {
@ -973,11 +973,11 @@ func (c *client) runPlayUdp() { @@ -973,11 +973,11 @@ func (c *client) runPlayUdp() {
}
}
func (c *client) runPlayTcp() {
func (c *client) runPlayTCP() {
readDone := make(chan error)
go func() {
frame := &gortsplib.InterleavedFrame{}
readBuf := make([]byte, clientTcpReadBufferSize)
readBuf := make([]byte, clientTCPReadBufferSize)
for {
frame.Content = readBuf
@ -1058,10 +1058,10 @@ func (c *client) runRecord() bool { @@ -1058,10 +1058,10 @@ func (c *client) runRecord() bool {
}
}
if c.streamProtocol == gortsplib.StreamProtocolUdp {
c.runRecordUdp()
if c.streamProtocol == gortsplib.StreamProtocolUDP {
c.runRecordUDP()
} else {
c.runRecordTcp()
c.runRecordTCP()
}
if onPublishCmd != nil {
@ -1076,7 +1076,7 @@ func (c *client) runRecord() bool { @@ -1076,7 +1076,7 @@ func (c *client) runRecord() bool {
return false
}
func (c *client) runRecordUdp() {
func (c *client) runRecordUDP() {
// open the firewall by sending packets to every channel
for _, track := range c.streamTracks {
c.p.serverRtp.write(
@ -1160,9 +1160,9 @@ func (c *client) runRecordUdp() { @@ -1160,9 +1160,9 @@ func (c *client) runRecordUdp() {
}
}
func (c *client) runRecordTcp() {
func (c *client) runRecordTCP() {
frame := &gortsplib.InterleavedFrame{}
readBuf := newMultiBuffer(3, clientTcpReadBufferSize)
readBuf := newMultiBuffer(3, clientTCPReadBufferSize)
readDone := make(chan error)
go func() {
@ -1184,7 +1184,7 @@ func (c *client) runRecordTcp() { @@ -1184,7 +1184,7 @@ func (c *client) runRecordTcp() {
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.clientFrameTcp <- clientFrameTcpReq{
c.p.clientFrameTCP <- clientFrameTCPReq{
c.path,
frame.TrackId,
frame.StreamType,

8
conf.go

@ -96,10 +96,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { @@ -96,10 +96,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
for _, proto := range conf.Protocols {
switch proto {
case "udp":
conf.protocolsParsed[gortsplib.StreamProtocolUdp] = struct{}{}
conf.protocolsParsed[gortsplib.StreamProtocolUDP] = struct{}{}
case "tcp":
conf.protocolsParsed[gortsplib.StreamProtocolTcp] = struct{}{}
conf.protocolsParsed[gortsplib.StreamProtocolTCP] = struct{}{}
default:
return nil, fmt.Errorf("unsupported protocol: %s", proto)
@ -218,10 +218,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { @@ -218,10 +218,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}
switch confp.SourceProtocol {
case "udp":
confp.sourceProtocolParsed = gortsplib.StreamProtocolUdp
confp.sourceProtocolParsed = gortsplib.StreamProtocolUDP
case "tcp":
confp.sourceProtocolParsed = gortsplib.StreamProtocolTcp
confp.sourceProtocolParsed = gortsplib.StreamProtocolTCP
default:
return nil, fmt.Errorf("unsupported protocol '%s'", confp.SourceProtocol)

2
go.mod

@ -5,7 +5,7 @@ go 1.12 @@ -5,7 +5,7 @@ go 1.12
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200905104751-d8dc7b7cc45d
github.com/aler9/gortsplib v0.0.0-20200905105840-e895fcfc3776
github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.6.1

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200905104751-d8dc7b7cc45d h1:NHaohlECn3Ri2kIxPQpXXMGYBfpsUP7lNcIHoqz7aps=
github.com/aler9/gortsplib v0.0.0-20200905104751-d8dc7b7cc45d/go.mod h1:XybE/Zt1yFtnNEjNhAyg2w6VjD8aJ79GFfpSjkfLNd8=
github.com/aler9/gortsplib v0.0.0-20200905105840-e895fcfc3776 h1:sQA2DjBj1VHYja5znLKmyIcSKv1/+kpyAn6qg/1WYJQ=
github.com/aler9/gortsplib v0.0.0-20200905105840-e895fcfc3776/go.mod h1:XybE/Zt1yFtnNEjNhAyg2w6VjD8aJ79GFfpSjkfLNd8=
github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd h1:s/l20rPNGiyjggMdkhsLu0aQ0K0OFcROUMBDu7fGT+I=
github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

44
main.go

@ -32,9 +32,9 @@ type program struct { @@ -32,9 +32,9 @@ type program struct {
metrics *metrics
pprof *pprof
paths map[string]*path
serverRtp *serverUdp
serverRtcp *serverUdp
serverRtsp *serverTcp
serverRtp *serverUDP
serverRtcp *serverUDP
serverRtsp *serverTCP
clients map[*client]struct{}
udpClientsByAddr map[udpClientAddr]*udpClient
countClient int64
@ -49,8 +49,8 @@ type program struct { @@ -49,8 +49,8 @@ type program struct {
clientSetupPlay chan clientSetupPlayReq
clientPlay chan *client
clientRecord chan *client
clientFrameUdp chan clientFrameUdpReq
clientFrameTcp chan clientFrameTcpReq
clientFrameUDP chan clientFrameUDPReq
clientFrameTCP chan clientFrameTCPReq
sourceReady chan *source
sourceNotReady chan *source
sourceFrame chan sourceFrameReq
@ -90,8 +90,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { @@ -90,8 +90,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
clientSetupPlay: make(chan clientSetupPlayReq),
clientPlay: make(chan *client),
clientRecord: make(chan *client),
clientFrameUdp: make(chan clientFrameUdpReq),
clientFrameTcp: make(chan clientFrameTcpReq),
clientFrameUDP: make(chan clientFrameUDPReq),
clientFrameTCP: make(chan clientFrameTCPReq),
sourceReady: make(chan *source),
sourceNotReady: make(chan *source),
sourceFrame: make(chan sourceFrameReq),
@ -129,19 +129,19 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { @@ -129,19 +129,19 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
p.paths[name] = newPath(p, name, confp, true)
}
if _, ok := conf.protocolsParsed[gortsplib.StreamProtocolUdp]; ok {
p.serverRtp, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp)
if _, ok := conf.protocolsParsed[gortsplib.StreamProtocolUDP]; ok {
p.serverRtp, err = newServerUDP(p, conf.RtpPort, gortsplib.StreamTypeRtp)
if err != nil {
return nil, err
}
p.serverRtcp, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
p.serverRtcp, err = newServerUDP(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
if err != nil {
return nil, err
}
}
p.serverRtsp, err = newServerTcp(p)
p.serverRtsp, err = newServerTCP(p)
if err != nil {
return nil, err
}
@ -273,16 +273,16 @@ outer: @@ -273,16 +273,16 @@ outer:
atomic.AddInt64(&p.countPublisher, 1)
client.state = clientStateRecord
if client.streamProtocol == gortsplib.StreamProtocolUdp {
if client.streamProtocol == gortsplib.StreamProtocolUDP {
for trackId, track := range client.streamTracks {
key := makeUdpClientAddr(client.ip(), track.rtpPort)
key := makeUDPClientAddr(client.ip(), track.rtpPort)
p.udpClientsByAddr[key] = &udpClient{
client: client,
trackId: trackId,
streamType: gortsplib.StreamTypeRtp,
}
key = makeUdpClientAddr(client.ip(), track.rtcpPort)
key = makeUDPClientAddr(client.ip(), track.rtcpPort)
p.udpClientsByAddr[key] = &udpClient{
client: client,
trackId: trackId,
@ -293,8 +293,8 @@ outer: @@ -293,8 +293,8 @@ outer:
client.path.onPublisherSetReady()
case req := <-p.clientFrameUdp:
pub, ok := p.udpClientsByAddr[makeUdpClientAddr(req.addr.IP, req.addr.Port)]
case req := <-p.clientFrameUDP:
pub, ok := p.udpClientsByAddr[makeUDPClientAddr(req.addr.IP, req.addr.Port)]
if !ok {
continue
}
@ -307,7 +307,7 @@ outer: @@ -307,7 +307,7 @@ outer:
pub.client.rtcpReceivers[pub.trackId].OnFrame(req.streamType, req.buf)
p.forwardFrame(pub.client.path, pub.trackId, req.streamType, req.buf)
case req := <-p.clientFrameTcp:
case req := <-p.clientFrameTCP:
p.forwardFrame(req.path, req.trackId, req.streamType, req.buf)
case source := <-p.sourceReady:
@ -347,8 +347,8 @@ outer: @@ -347,8 +347,8 @@ outer:
case <-p.clientPlay:
case <-p.clientRecord:
case <-p.clientFrameUdp:
case <-p.clientFrameTcp:
case <-p.clientFrameUDP:
case <-p.clientFrameTCP:
case <-p.sourceReady:
case <-p.sourceNotReady:
case <-p.sourceFrame:
@ -395,8 +395,8 @@ outer: @@ -395,8 +395,8 @@ outer:
close(p.clientSetupPlay)
close(p.clientPlay)
close(p.clientRecord)
close(p.clientFrameUdp)
close(p.clientFrameTcp)
close(p.clientFrameUDP)
close(p.clientFrameTCP)
close(p.sourceReady)
close(p.sourceNotReady)
close(p.sourceFrame)
@ -432,7 +432,7 @@ func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.Str @@ -432,7 +432,7 @@ func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.Str
continue
}
if c.streamProtocol == gortsplib.StreamProtocolUdp {
if c.streamProtocol == gortsplib.StreamProtocolUDP {
if streamType == gortsplib.StreamTypeRtp {
p.serverRtp.write(frame, &net.UDPAddr{
IP: c.ip(),

2
main_test.go

@ -213,7 +213,7 @@ func TestRead(t *testing.T) { @@ -213,7 +213,7 @@ func TestRead(t *testing.T) {
}
}
func TestTcpOnly(t *testing.T) {
func TestTCPOnly(t *testing.T) {
stdin := []byte("\n" +
"protocols: [tcp]\n")
p, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin))

12
server-tcp.go

@ -4,14 +4,14 @@ import ( @@ -4,14 +4,14 @@ import (
"net"
)
type serverTcp struct {
type serverTCP struct {
p *program
listener *net.TCPListener
done chan struct{}
}
func newServerTcp(p *program) (*serverTcp, error) {
func newServerTCP(p *program) (*serverTCP, error) {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: p.conf.RtspPort,
})
@ -19,7 +19,7 @@ func newServerTcp(p *program) (*serverTcp, error) { @@ -19,7 +19,7 @@ func newServerTcp(p *program) (*serverTcp, error) {
return nil, err
}
l := &serverTcp{
l := &serverTCP{
p: p,
listener: listener,
done: make(chan struct{}),
@ -29,11 +29,11 @@ func newServerTcp(p *program) (*serverTcp, error) { @@ -29,11 +29,11 @@ func newServerTcp(p *program) (*serverTcp, error) {
return l, nil
}
func (l *serverTcp) log(format string, args ...interface{}) {
func (l *serverTCP) log(format string, args ...interface{}) {
l.p.log("[TCP listener] "+format, args...)
}
func (l *serverTcp) run() {
func (l *serverTCP) run() {
for {
conn, err := l.listener.AcceptTCP()
if err != nil {
@ -46,7 +46,7 @@ func (l *serverTcp) run() { @@ -46,7 +46,7 @@ func (l *serverTcp) run() {
close(l.done)
}
func (l *serverTcp) close() {
func (l *serverTCP) close() {
l.listener.Close()
<-l.done
}

18
server-udp.go

@ -12,7 +12,7 @@ type udpBufAddrPair struct { @@ -12,7 +12,7 @@ type udpBufAddrPair struct {
addr *net.UDPAddr
}
type serverUdp struct {
type serverUDP struct {
p *program
pc *net.UDPConn
streamType gortsplib.StreamType
@ -22,7 +22,7 @@ type serverUdp struct { @@ -22,7 +22,7 @@ type serverUdp struct {
done chan struct{}
}
func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serverUdp, error) {
func newServerUDP(p *program, port int, streamType gortsplib.StreamType) (*serverUDP, error) {
pc, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port,
})
@ -30,11 +30,11 @@ func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serve @@ -30,11 +30,11 @@ func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serve
return nil, err
}
l := &serverUdp{
l := &serverUDP{
p: p,
pc: pc,
streamType: streamType,
readBuf: newMultiBuffer(3, clientUdpReadBufferSize),
readBuf: newMultiBuffer(3, clientUDPReadBufferSize),
writec: make(chan udpBufAddrPair),
done: make(chan struct{}),
}
@ -43,7 +43,7 @@ func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serve @@ -43,7 +43,7 @@ func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serve
return l, nil
}
func (l *serverUdp) log(format string, args ...interface{}) {
func (l *serverUDP) log(format string, args ...interface{}) {
var label string
if l.streamType == gortsplib.StreamTypeRtp {
label = "RTP"
@ -53,7 +53,7 @@ func (l *serverUdp) log(format string, args ...interface{}) { @@ -53,7 +53,7 @@ func (l *serverUdp) log(format string, args ...interface{}) {
l.p.log("[UDP/"+label+" listener] "+format, args...)
}
func (l *serverUdp) run() {
func (l *serverUDP) run() {
writeDone := make(chan struct{})
go func() {
defer close(writeDone)
@ -70,7 +70,7 @@ func (l *serverUdp) run() { @@ -70,7 +70,7 @@ func (l *serverUdp) run() {
break
}
l.p.clientFrameUdp <- clientFrameUdpReq{
l.p.clientFrameUDP <- clientFrameUDPReq{
addr,
l.streamType,
buf[:n],
@ -83,11 +83,11 @@ func (l *serverUdp) run() { @@ -83,11 +83,11 @@ func (l *serverUdp) run() {
close(l.done)
}
func (l *serverUdp) close() {
func (l *serverUDP) close() {
l.pc.Close()
<-l.done
}
func (l *serverUdp) write(data []byte, addr *net.UDPAddr) {
func (l *serverUDP) write(data []byte, addr *net.UDPAddr) {
l.writec <- udpBufAddrPair{data, addr}
}

34
source.go

@ -12,8 +12,8 @@ import ( @@ -12,8 +12,8 @@ import (
const (
sourceRetryInterval = 5 * time.Second
sourceUdpReadBufferSize = 2048
sourceTcpReadBufferSize = 128 * 1024
sourceUDPReadBufferSize = 2048
sourceTCPReadBufferSize = 128 * 1024
)
type sourceFrameReq struct {
@ -182,16 +182,16 @@ func (s *source) runInnerInner() bool { @@ -182,16 +182,16 @@ func (s *source) runInnerInner() bool {
s.path.publisherSdpText = serverSdpText
s.path.publisherSdpParsed = serverSdpParsed
if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp {
return s.runUdp(conn)
if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUDP {
return s.runUDP(conn)
} else {
return s.runTcp(conn)
return s.runTCP(conn)
}
}
func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
var rtpReads []gortsplib.UdpReadFunc
var rtcpReads []gortsplib.UdpReadFunc
func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
var rtpReads []gortsplib.UDPReadFunc
var rtcpReads []gortsplib.UDPReadFunc
for _, track := range s.tracks {
for {
@ -200,7 +200,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { @@ -200,7 +200,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1
rtpRead, rtcpRead, _, err := conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort)
rtpRead, rtcpRead, _, err := conn.SetupUDP(s.confp.sourceUrl, track, rtpPort, rtcpPort)
if err != nil {
// retry if it's a bind error
if nerr, ok := err.(*net.OpError); ok {
@ -236,10 +236,10 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { @@ -236,10 +236,10 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
// receive RTP packets
for trackId, rtpRead := range rtpReads {
wg.Add(1)
go func(trackId int, rtpRead gortsplib.UdpReadFunc) {
go func(trackId int, rtpRead gortsplib.UDPReadFunc) {
defer wg.Done()
multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize)
multiBuf := newMultiBuffer(3, sourceUDPReadBufferSize)
for {
buf := multiBuf.next()
@ -256,10 +256,10 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { @@ -256,10 +256,10 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
// receive RTCP packets
for trackId, rtcpRead := range rtcpReads {
wg.Add(1)
go func(trackId int, rtcpRead gortsplib.UdpReadFunc) {
go func(trackId int, rtcpRead gortsplib.UDPReadFunc) {
defer wg.Done()
multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize)
multiBuf := newMultiBuffer(3, sourceUDPReadBufferSize)
for {
buf := multiBuf.next()
@ -275,7 +275,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { @@ -275,7 +275,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
tcpConnDone := make(chan error)
go func() {
tcpConnDone <- conn.LoopUdp(s.confp.sourceUrl)
tcpConnDone <- conn.LoopUDP(s.confp.sourceUrl)
}()
var ret bool
@ -304,9 +304,9 @@ outer: @@ -304,9 +304,9 @@ outer:
return ret
}
func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
func (s *source) runTCP(conn *gortsplib.ConnClient) bool {
for _, track := range s.tracks {
_, err := conn.SetupTcp(s.confp.sourceUrl, track)
_, err := conn.SetupTCP(s.confp.sourceUrl, track)
if err != nil {
conn.Close()
s.path.log("source ERR: %s", err)
@ -324,7 +324,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { @@ -324,7 +324,7 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
s.p.sourceReady <- s
frame := &gortsplib.InterleavedFrame{}
multiBuf := newMultiBuffer(3, sourceTcpReadBufferSize)
multiBuf := newMultiBuffer(3, sourceTCPReadBufferSize)
tcpConnDone := make(chan error)
go func() {

Loading…
Cancel
Save