|
|
|
|
@ -5,6 +5,8 @@ import (
@@ -5,6 +5,8 @@ import (
|
|
|
|
|
"math/rand" |
|
|
|
|
"net" |
|
|
|
|
"net/url" |
|
|
|
|
"os" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/aler9/gortsplib" |
|
|
|
|
@ -12,17 +14,9 @@ import (
@@ -12,17 +14,9 @@ import (
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
sourceRetryInterval = 5 * time.Second |
|
|
|
|
sourceCheckStreamInterval = 5 * time.Second |
|
|
|
|
sourceKeepaliveInterval = 60 * time.Second |
|
|
|
|
sourceReceiverReportInterval = 10 * time.Second |
|
|
|
|
sourceRetryInterval = 5 * time.Second |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type sourceUdpListenerPair struct { |
|
|
|
|
rtpl *sourceUdpListener |
|
|
|
|
rtcpl *sourceUdpListener |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type source struct { |
|
|
|
|
p *program |
|
|
|
|
path string |
|
|
|
|
@ -32,7 +26,6 @@ type source struct {
@@ -32,7 +26,6 @@ type source struct {
|
|
|
|
|
tracks []*gortsplib.Track |
|
|
|
|
serverSdpText []byte |
|
|
|
|
serverSdpParsed *sdp.SessionDescription |
|
|
|
|
rtcpReceivers []*gortsplib.RtcpReceiver |
|
|
|
|
readBuf *doubleBuffer |
|
|
|
|
|
|
|
|
|
terminate chan struct{} |
|
|
|
|
@ -123,11 +116,15 @@ func (s *source) run() {
@@ -123,11 +116,15 @@ func (s *source) run() {
|
|
|
|
|
func (s *source) do() bool { |
|
|
|
|
s.log("initializing with protocol %s", s.proto) |
|
|
|
|
|
|
|
|
|
var nconn net.Conn |
|
|
|
|
var conn *gortsplib.ConnClient |
|
|
|
|
var err error |
|
|
|
|
dialDone := make(chan struct{}) |
|
|
|
|
go func() { |
|
|
|
|
nconn, err = net.DialTimeout("tcp", s.u.Host, s.p.conf.ReadTimeout) |
|
|
|
|
conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ |
|
|
|
|
Host: s.u.Host, |
|
|
|
|
ReadTimeout: s.p.conf.ReadTimeout, |
|
|
|
|
WriteTimeout: s.p.conf.WriteTimeout, |
|
|
|
|
}) |
|
|
|
|
close(dialDone) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
@ -141,13 +138,8 @@ func (s *source) do() bool {
@@ -141,13 +138,8 @@ func (s *source) do() bool {
|
|
|
|
|
s.log("ERR: %s", err) |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
defer nconn.Close() |
|
|
|
|
|
|
|
|
|
conn := gortsplib.NewConnClient(gortsplib.ConnClientConf{ |
|
|
|
|
Conn: nconn, |
|
|
|
|
ReadTimeout: s.p.conf.ReadTimeout, |
|
|
|
|
WriteTimeout: s.p.conf.WriteTimeout, |
|
|
|
|
}) |
|
|
|
|
defer conn.Close() |
|
|
|
|
|
|
|
|
|
_, err = conn.Options(s.u) |
|
|
|
|
if err != nil { |
|
|
|
|
@ -176,59 +168,42 @@ func (s *source) do() bool {
@@ -176,59 +168,42 @@ func (s *source) do() bool {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *source) runUdp(conn *gortsplib.ConnClient) bool { |
|
|
|
|
publisherIp := conn.NetConn().RemoteAddr().(*net.TCPAddr).IP |
|
|
|
|
|
|
|
|
|
var sourceUdpListenerPairs []sourceUdpListenerPair |
|
|
|
|
type trackListenerPair struct { |
|
|
|
|
rtpl *gortsplib.ConnClientUdpListener |
|
|
|
|
rtcpl *gortsplib.ConnClientUdpListener |
|
|
|
|
} |
|
|
|
|
var listeners []*trackListenerPair |
|
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
|
for _, pair := range sourceUdpListenerPairs { |
|
|
|
|
pair.rtpl.close() |
|
|
|
|
pair.rtcpl.close() |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
for _, track := range s.tracks { |
|
|
|
|
var rtpl *gortsplib.ConnClientUdpListener |
|
|
|
|
var rtcpl *gortsplib.ConnClientUdpListener |
|
|
|
|
var err error |
|
|
|
|
|
|
|
|
|
for i, track := range s.tracks { |
|
|
|
|
var rtpPort int |
|
|
|
|
var rtcpPort int |
|
|
|
|
var rtpl *sourceUdpListener |
|
|
|
|
var rtcpl *sourceUdpListener |
|
|
|
|
func() { |
|
|
|
|
for { |
|
|
|
|
// choose two consecutive ports in range 65536-10000
|
|
|
|
|
// rtp must be pair and rtcp odd
|
|
|
|
|
rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 |
|
|
|
|
rtcpPort = rtpPort + 1 |
|
|
|
|
|
|
|
|
|
var err error |
|
|
|
|
rtpl, err = newSourceUdpListener(s.p, rtpPort, s, i, |
|
|
|
|
gortsplib.StreamTypeRtp, publisherIp) |
|
|
|
|
if err != nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
for { |
|
|
|
|
// choose two consecutive ports in range 65536-10000
|
|
|
|
|
// rtp must be pair and rtcp odd
|
|
|
|
|
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 |
|
|
|
|
rtcpPort := rtpPort + 1 |
|
|
|
|
|
|
|
|
|
rtcpl, err = newSourceUdpListener(s.p, rtcpPort, s, i, |
|
|
|
|
gortsplib.StreamTypeRtcp, publisherIp) |
|
|
|
|
if err != nil { |
|
|
|
|
rtpl.close() |
|
|
|
|
continue |
|
|
|
|
rtpl, rtcpl, _, err = conn.SetupUdp(s.u, track, rtpPort, rtcpPort) |
|
|
|
|
if err != nil { |
|
|
|
|
// retry if it's a bind error
|
|
|
|
|
if nerr, ok := err.(*net.OpError); ok { |
|
|
|
|
if serr, ok := nerr.Err.(*os.SyscallError); ok { |
|
|
|
|
if serr.Syscall == "bind" { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return |
|
|
|
|
s.log("ERR: %s", err) |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
rtpServerPort, rtcpServerPort, _, err := conn.SetupUdp(s.u, track, rtpPort, rtcpPort) |
|
|
|
|
if err != nil { |
|
|
|
|
s.log("ERR: %s", err) |
|
|
|
|
rtpl.close() |
|
|
|
|
rtcpl.close() |
|
|
|
|
return true |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rtpl.publisherPort = rtpServerPort |
|
|
|
|
rtcpl.publisherPort = rtcpServerPort |
|
|
|
|
|
|
|
|
|
sourceUdpListenerPairs = append(sourceUdpListenerPairs, sourceUdpListenerPair{ |
|
|
|
|
listeners = append(listeners, &trackListenerPair{ |
|
|
|
|
rtpl: rtpl, |
|
|
|
|
rtcpl: rtcpl, |
|
|
|
|
}) |
|
|
|
|
@ -240,26 +215,53 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
@@ -240,26 +215,53 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
|
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks)) |
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() |
|
|
|
|
} |
|
|
|
|
s.p.events <- programEventStreamerReady{s} |
|
|
|
|
|
|
|
|
|
for _, pair := range sourceUdpListenerPairs { |
|
|
|
|
pair.rtpl.start() |
|
|
|
|
pair.rtcpl.start() |
|
|
|
|
} |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
|
|
|
|
|
s.p.events <- programEventStreamerReady{s} |
|
|
|
|
for trackId, lp := range listeners { |
|
|
|
|
wg.Add(2) |
|
|
|
|
|
|
|
|
|
// receive RTP packets
|
|
|
|
|
go func(trackId int, l *gortsplib.ConnClientUdpListener) { |
|
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
|
|
doubleBuf := newDoubleBuffer(2048) |
|
|
|
|
for { |
|
|
|
|
buf := doubleBuf.swap() |
|
|
|
|
|
|
|
|
|
n, err := l.Read(buf) |
|
|
|
|
if err != nil { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} |
|
|
|
|
} |
|
|
|
|
}(trackId, lp.rtpl) |
|
|
|
|
|
|
|
|
|
// receive RTCP packets
|
|
|
|
|
go func(trackId int, l *gortsplib.ConnClientUdpListener) { |
|
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
|
|
doubleBuf := newDoubleBuffer(2048) |
|
|
|
|
for { |
|
|
|
|
buf := doubleBuf.swap() |
|
|
|
|
|
|
|
|
|
n, err := l.Read(buf) |
|
|
|
|
if err != nil { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.p.events <- programEventStreamerFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} |
|
|
|
|
} |
|
|
|
|
}(trackId, lp.rtcpl) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tcpConnDone := make(chan error) |
|
|
|
|
go func() { |
|
|
|
|
tcpConnDone <- conn.LoopUDP(s.u) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
checkStreamTicker := time.NewTicker(sourceCheckStreamInterval) |
|
|
|
|
receiverReportTicker := time.NewTicker(sourceReceiverReportInterval) |
|
|
|
|
|
|
|
|
|
var ret bool |
|
|
|
|
|
|
|
|
|
outer: |
|
|
|
|
@ -275,46 +277,16 @@ outer:
@@ -275,46 +277,16 @@ outer:
|
|
|
|
|
s.log("ERR: %s", err) |
|
|
|
|
ret = true |
|
|
|
|
break outer |
|
|
|
|
|
|
|
|
|
case <-checkStreamTicker.C: |
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
if time.Since(s.rtcpReceivers[trackId].LastFrameTime()) >= s.p.conf.ReadTimeout { |
|
|
|
|
s.log("ERR: stream is dead") |
|
|
|
|
conn.NetConn().Close() |
|
|
|
|
<-tcpConnDone |
|
|
|
|
ret = true |
|
|
|
|
break outer |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case <-receiverReportTicker.C: |
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
frame := s.rtcpReceivers[trackId].Report() |
|
|
|
|
sourceUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ |
|
|
|
|
addr: &net.UDPAddr{ |
|
|
|
|
IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, |
|
|
|
|
Zone: conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone, |
|
|
|
|
Port: sourceUdpListenerPairs[trackId].rtcpl.publisherPort, |
|
|
|
|
}, |
|
|
|
|
buf: frame, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
checkStreamTicker.Stop() |
|
|
|
|
receiverReportTicker.Stop() |
|
|
|
|
|
|
|
|
|
s.p.events <- programEventStreamerNotReady{s} |
|
|
|
|
|
|
|
|
|
for _, pair := range sourceUdpListenerPairs { |
|
|
|
|
pair.rtpl.stop() |
|
|
|
|
pair.rtcpl.stop() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
s.rtcpReceivers[trackId].Close() |
|
|
|
|
for _, lp := range listeners { |
|
|
|
|
lp.rtpl.Close() |
|
|
|
|
lp.rtcpl.Close() |
|
|
|
|
} |
|
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
|
|
return ret |
|
|
|
|
} |
|
|
|
|
@ -334,11 +306,6 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
@@ -334,11 +306,6 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
|
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks)) |
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.p.events <- programEventStreamerReady{s} |
|
|
|
|
|
|
|
|
|
frame := &gortsplib.InterleavedFrame{} |
|
|
|
|
@ -355,15 +322,10 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
@@ -355,15 +322,10 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
|
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) |
|
|
|
|
s.p.events <- programEventStreamerFrame{s, frame.TrackId, frame.StreamType, frame.Content} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// a ticker to check the stream is not needed since there's already a deadline
|
|
|
|
|
// on the RTSP reads
|
|
|
|
|
receiverReportTicker := time.NewTicker(sourceReceiverReportInterval) |
|
|
|
|
|
|
|
|
|
var ret bool |
|
|
|
|
|
|
|
|
|
outer: |
|
|
|
|
@ -379,28 +341,11 @@ outer:
@@ -379,28 +341,11 @@ outer:
|
|
|
|
|
s.log("ERR: %s", err) |
|
|
|
|
ret = true |
|
|
|
|
break outer |
|
|
|
|
|
|
|
|
|
case <-receiverReportTicker.C: |
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
frame := s.rtcpReceivers[trackId].Report() |
|
|
|
|
|
|
|
|
|
conn.WriteFrame(&gortsplib.InterleavedFrame{ |
|
|
|
|
TrackId: trackId, |
|
|
|
|
StreamType: gortsplib.StreamTypeRtcp, |
|
|
|
|
Content: frame, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
receiverReportTicker.Stop() |
|
|
|
|
|
|
|
|
|
s.p.events <- programEventStreamerNotReady{s} |
|
|
|
|
|
|
|
|
|
for trackId := range s.tracks { |
|
|
|
|
s.rtcpReceivers[trackId].Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return ret |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|