Browse Source

rename streamer into source

pull/52/head
aler9 6 years ago
parent
commit
1de43588ad
  1. 30
      main.go
  2. 24
      source-udpl.go
  3. 78
      source.go

30
main.go

@ -141,19 +141,19 @@ type programEventClientFrameTcp struct {
func (programEventClientFrameTcp) isProgramEvent() {} func (programEventClientFrameTcp) isProgramEvent() {}
type programEventStreamerReady struct { type programEventStreamerReady struct {
streamer *streamer source *source
} }
func (programEventStreamerReady) isProgramEvent() {} func (programEventStreamerReady) isProgramEvent() {}
type programEventStreamerNotReady struct { type programEventStreamerNotReady struct {
streamer *streamer source *source
} }
func (programEventStreamerNotReady) isProgramEvent() {} func (programEventStreamerNotReady) isProgramEvent() {}
type programEventStreamerFrame struct { type programEventStreamerFrame struct {
streamer *streamer source *source
trackId int trackId int
streamType gortsplib.StreamType streamType gortsplib.StreamType
buf []byte buf []byte
@ -165,7 +165,7 @@ type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {} func (programEventTerminate) isProgramEvent() {}
// a publisher can be either a serverClient or a streamer // a publisher can be either a serverClient or a source
type publisher interface { type publisher interface {
publisherIsReady() bool publisherIsReady() bool
publisherSdpText() []byte publisherSdpText() []byte
@ -178,7 +178,7 @@ type program struct {
rtpl *serverUdpListener rtpl *serverUdpListener
rtcpl *serverUdpListener rtcpl *serverUdpListener
clients map[*serverClient]struct{} clients map[*serverClient]struct{}
streamers []*streamer sources []*source
publishers map[string]publisher publishers map[string]publisher
publisherCount int publisherCount int
receiverCount int receiverCount int
@ -216,12 +216,12 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) {
for path, pconf := range conf.Paths { for path, pconf := range conf.Paths {
if pconf.Source != "record" { if pconf.Source != "record" {
s, err := newStreamer(p, path, pconf.Source, pconf.SourceProtocol) s, err := newSource(p, path, pconf.Source, pconf.SourceProtocol)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.streamers = append(p.streamers, s) p.sources = append(p.sources, s)
p.publishers[path] = s p.publishers[path] = s
} }
} }
@ -258,7 +258,7 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) {
go p.rtpl.run() go p.rtpl.run()
go p.rtcpl.run() go p.rtcpl.run()
go p.rtspl.run() go p.rtspl.run()
for _, s := range p.streamers { for _, s := range p.sources {
go s.run() go s.run()
} }
go p.run() go p.run()
@ -408,24 +408,24 @@ outer:
p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf)
case programEventStreamerReady: case programEventStreamerReady:
evt.streamer.ready = true evt.source.ready = true
p.publisherCount += 1 p.publisherCount += 1
evt.streamer.log("ready") evt.source.log("ready")
case programEventStreamerNotReady: case programEventStreamerNotReady:
evt.streamer.ready = false evt.source.ready = false
p.publisherCount -= 1 p.publisherCount -= 1
evt.streamer.log("not ready") evt.source.log("not ready")
// close all clients that share the same path // close all clients that share the same path
for oc := range p.clients { for oc := range p.clients {
if oc.path == evt.streamer.path { if oc.path == evt.source.path {
go oc.close() go oc.close()
} }
} }
case programEventStreamerFrame: case programEventStreamerFrame:
p.forwardFrame(evt.streamer.path, evt.trackId, evt.streamType, evt.buf) p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf)
case programEventTerminate: case programEventTerminate:
break outer break outer
@ -468,7 +468,7 @@ outer:
} }
}() }()
for _, s := range p.streamers { for _, s := range p.sources {
s.close() s.close()
} }

24
streamer-udpl.go → source-udpl.go

@ -7,9 +7,9 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
type streamerUdpListener struct { type sourceUdpListener struct {
p *program p *program
streamer *streamer source *source
trackId int trackId int
streamType gortsplib.StreamType streamType gortsplib.StreamType
publisherIp net.IP publisherIp net.IP
@ -22,8 +22,8 @@ type streamerUdpListener struct {
done chan struct{} done chan struct{}
} }
func newStreamerUdpListener(p *program, port int, streamer *streamer, func newSourceUdpListener(p *program, port int, source *source,
trackId int, streamType gortsplib.StreamType, publisherIp net.IP) (*streamerUdpListener, error) { trackId int, streamType gortsplib.StreamType, publisherIp net.IP) (*sourceUdpListener, error) {
nconn, err := net.ListenUDP("udp", &net.UDPAddr{ nconn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port, Port: port,
}) })
@ -31,9 +31,9 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer,
return nil, err return nil, err
} }
l := &streamerUdpListener{ l := &sourceUdpListener{
p: p, p: p,
streamer: streamer, source: source,
trackId: trackId, trackId: trackId,
streamType: streamType, streamType: streamType,
publisherIp: publisherIp, publisherIp: publisherIp,
@ -46,20 +46,20 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer,
return l, nil return l, nil
} }
func (l *streamerUdpListener) close() { func (l *sourceUdpListener) close() {
l.nconn.Close() // close twice l.nconn.Close() // close twice
} }
func (l *streamerUdpListener) start() { func (l *sourceUdpListener) start() {
go l.run() go l.run()
} }
func (l *streamerUdpListener) stop() { func (l *sourceUdpListener) stop() {
l.nconn.Close() l.nconn.Close()
<-l.done <-l.done
} }
func (l *streamerUdpListener) run() { func (l *sourceUdpListener) run() {
writeDone := make(chan struct{}) writeDone := make(chan struct{})
go func() { go func() {
defer close(writeDone) defer close(writeDone)
@ -80,8 +80,8 @@ func (l *streamerUdpListener) run() {
continue continue
} }
l.streamer.RtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) l.source.RtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n])
l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.streamType, buf[:n]} l.p.events <- programEventStreamerFrame{l.source, l.trackId, l.streamType, buf[:n]}
} }
close(l.writeChan) close(l.writeChan)

78
streamer.go → source.go

@ -12,18 +12,18 @@ import (
) )
const ( const (
streamerRetryInterval = 5 * time.Second sourceRetryInterval = 5 * time.Second
streamerCheckStreamInterval = 5 * time.Second sourceCheckStreamInterval = 5 * time.Second
streamerKeepaliveInterval = 60 * time.Second sourceKeepaliveInterval = 60 * time.Second
streamerReceiverReportInterval = 10 * time.Second sourceReceiverReportInterval = 10 * time.Second
) )
type streamerUdpListenerPair struct { type sourceUdpListenerPair struct {
rtpl *streamerUdpListener rtpl *sourceUdpListener
rtcpl *streamerUdpListener rtcpl *sourceUdpListener
} }
type streamer struct { type source struct {
p *program p *program
path string path string
u *url.URL u *url.URL
@ -39,13 +39,13 @@ type streamer struct {
done chan struct{} done chan struct{}
} }
func newStreamer(p *program, path string, source string, sourceProtocol string) (*streamer, error) { func newSource(p *program, path string, sourceStr string, sourceProtocol string) (*source, error) {
u, err := url.Parse(source) u, err := url.Parse(sourceStr)
if err != nil { if err != nil {
return nil, fmt.Errorf("'%s' is not a valid source not an RTSP url", source) return nil, fmt.Errorf("'%s' is not a valid RTSP url", sourceStr)
} }
if u.Scheme != "rtsp" { if u.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", source) return nil, fmt.Errorf("'%s' is not a valid RTSP url", sourceStr)
} }
if u.Port() == "" { if u.Port() == "" {
u.Host += ":554" u.Host += ":554"
@ -73,7 +73,7 @@ func newStreamer(p *program, path string, source string, sourceProtocol string)
return nil, err return nil, err
} }
s := &streamer{ s := &source{
p: p, p: p,
path: path, path: path,
u: u, u: u,
@ -86,30 +86,30 @@ func newStreamer(p *program, path string, source string, sourceProtocol string)
return s, nil return s, nil
} }
func (s *streamer) log(format string, args ...interface{}) { func (s *source) log(format string, args ...interface{}) {
s.p.log("[streamer "+s.path+"] "+format, args...) s.p.log("[source "+s.path+"] "+format, args...)
} }
func (s *streamer) publisherIsReady() bool { func (s *source) publisherIsReady() bool {
return s.ready return s.ready
} }
func (s *streamer) publisherSdpText() []byte { func (s *source) publisherSdpText() []byte {
return s.serverSdpText return s.serverSdpText
} }
func (s *streamer) publisherSdpParsed() *sdp.SessionDescription { func (s *source) publisherSdpParsed() *sdp.SessionDescription {
return s.serverSdpParsed return s.serverSdpParsed
} }
func (s *streamer) run() { func (s *source) run() {
for { for {
ok := s.do() ok := s.do()
if !ok { if !ok {
break break
} }
t := time.NewTimer(streamerRetryInterval) t := time.NewTimer(sourceRetryInterval)
select { select {
case <-s.terminate: case <-s.terminate:
break break
@ -120,7 +120,7 @@ func (s *streamer) run() {
close(s.done) close(s.done)
} }
func (s *streamer) do() bool { func (s *source) do() bool {
s.log("initializing with protocol %s", s.proto) s.log("initializing with protocol %s", s.proto)
var nconn net.Conn var nconn net.Conn
@ -175,13 +175,13 @@ func (s *streamer) do() bool {
} }
} }
func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
publisherIp := conn.NetConn().RemoteAddr().(*net.TCPAddr).IP publisherIp := conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
var streamerUdpListenerPairs []streamerUdpListenerPair var sourceUdpListenerPairs []sourceUdpListenerPair
defer func() { defer func() {
for _, pair := range streamerUdpListenerPairs { for _, pair := range sourceUdpListenerPairs {
pair.rtpl.close() pair.rtpl.close()
pair.rtcpl.close() pair.rtcpl.close()
} }
@ -190,8 +190,8 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
for i, media := range s.clientSdpParsed.MediaDescriptions { for i, media := range s.clientSdpParsed.MediaDescriptions {
var rtpPort int var rtpPort int
var rtcpPort int var rtcpPort int
var rtpl *streamerUdpListener var rtpl *sourceUdpListener
var rtcpl *streamerUdpListener var rtcpl *sourceUdpListener
func() { func() {
for { for {
// choose two consecutive ports in range 65536-10000 // choose two consecutive ports in range 65536-10000
@ -200,13 +200,13 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
rtcpPort = rtpPort + 1 rtcpPort = rtpPort + 1
var err error var err error
rtpl, err = newStreamerUdpListener(s.p, rtpPort, s, i, rtpl, err = newSourceUdpListener(s.p, rtpPort, s, i,
gortsplib.StreamTypeRtp, publisherIp) gortsplib.StreamTypeRtp, publisherIp)
if err != nil { if err != nil {
continue continue
} }
rtcpl, err = newStreamerUdpListener(s.p, rtcpPort, s, i, rtcpl, err = newSourceUdpListener(s.p, rtcpPort, s, i,
gortsplib.StreamTypeRtcp, publisherIp) gortsplib.StreamTypeRtcp, publisherIp)
if err != nil { if err != nil {
rtpl.close() rtpl.close()
@ -228,7 +228,7 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
rtpl.publisherPort = rtpServerPort rtpl.publisherPort = rtpServerPort
rtcpl.publisherPort = rtcpServerPort rtcpl.publisherPort = rtcpServerPort
streamerUdpListenerPairs = append(streamerUdpListenerPairs, streamerUdpListenerPair{ sourceUdpListenerPairs = append(sourceUdpListenerPairs, sourceUdpListenerPair{
rtpl: rtpl, rtpl: rtpl,
rtcpl: rtcpl, rtcpl: rtcpl,
}) })
@ -245,14 +245,14 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
s.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() s.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
} }
for _, pair := range streamerUdpListenerPairs { for _, pair := range sourceUdpListenerPairs {
pair.rtpl.start() pair.rtpl.start()
pair.rtcpl.start() pair.rtcpl.start()
} }
sendKeepaliveTicker := time.NewTicker(streamerKeepaliveInterval) sendKeepaliveTicker := time.NewTicker(sourceKeepaliveInterval)
checkStreamTicker := time.NewTicker(streamerCheckStreamInterval) checkStreamTicker := time.NewTicker(sourceCheckStreamInterval)
receiverReportTicker := time.NewTicker(streamerReceiverReportInterval) receiverReportTicker := time.NewTicker(sourceReceiverReportInterval)
s.p.events <- programEventStreamerReady{s} s.p.events <- programEventStreamerReady{s}
@ -285,11 +285,11 @@ outer:
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
frame := s.RtcpReceivers[trackId].Report() frame := s.RtcpReceivers[trackId].Report()
streamerUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ sourceUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
Zone: conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone, Zone: conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone,
Port: streamerUdpListenerPairs[trackId].rtcpl.publisherPort, Port: sourceUdpListenerPairs[trackId].rtcpl.publisherPort,
}, },
buf: frame, buf: frame,
} }
@ -303,7 +303,7 @@ outer:
s.p.events <- programEventStreamerNotReady{s} s.p.events <- programEventStreamerNotReady{s}
for _, pair := range streamerUdpListenerPairs { for _, pair := range sourceUdpListenerPairs {
pair.rtpl.stop() pair.rtpl.stop()
pair.rtcpl.stop() pair.rtcpl.stop()
} }
@ -315,7 +315,7 @@ outer:
return ret return ret
} }
func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
for i, media := range s.clientSdpParsed.MediaDescriptions { for i, media := range s.clientSdpParsed.MediaDescriptions {
_, err := conn.SetupTcp(s.u, media, i) _, err := conn.SetupTcp(s.u, media, i)
if err != nil { if err != nil {
@ -359,7 +359,7 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
// a ticker to check the stream is not needed since there's already a deadline // a ticker to check the stream is not needed since there's already a deadline
// on the RTSP reads // on the RTSP reads
receiverReportTicker := time.NewTicker(streamerReceiverReportInterval) receiverReportTicker := time.NewTicker(sourceReceiverReportInterval)
var ret bool var ret bool
@ -398,7 +398,7 @@ outer:
return ret return ret
} }
func (s *streamer) close() { func (s *source) close() {
close(s.terminate) close(s.terminate)
<-s.done <-s.done
} }
Loading…
Cancel
Save