|
|
|
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
|
|
|
|
package path |
|
|
|
|
package core |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
@ -15,11 +15,6 @@ import (
@@ -15,11 +15,6 @@ import (
|
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/conf" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/externalcmd" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/logger" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/readpublisher" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/rtmpsource" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/rtspsource" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/source" |
|
|
|
|
"github.com/aler9/rtsp-simple-server/internal/stats" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func newEmptyTimer() *time.Timer { |
|
|
|
@ -28,13 +23,12 @@ func newEmptyTimer() *time.Timer {
@@ -28,13 +23,12 @@ func newEmptyTimer() *time.Timer {
|
|
|
|
|
return t |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Parent is implemented by pathman.PathMan.
|
|
|
|
|
type Parent interface { |
|
|
|
|
type pathParent interface { |
|
|
|
|
Log(logger.Level, string, ...interface{}) |
|
|
|
|
OnPathClose(*Path) |
|
|
|
|
OnPathClose(*path) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type rtspSession interface { |
|
|
|
|
type pathRTSPSession interface { |
|
|
|
|
IsRTSPSession() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -60,8 +54,43 @@ const (
@@ -60,8 +54,43 @@ const (
|
|
|
|
|
sourceStateReady |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Path is a path.
|
|
|
|
|
type Path struct { |
|
|
|
|
type reader interface { |
|
|
|
|
OnFrame(int, gortsplib.StreamType, []byte) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type readersMap struct { |
|
|
|
|
mutex sync.RWMutex |
|
|
|
|
ma map[reader]struct{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newReadersMap() *readersMap { |
|
|
|
|
return &readersMap{ |
|
|
|
|
ma: make(map[reader]struct{}), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m *readersMap) add(reader reader) { |
|
|
|
|
m.mutex.Lock() |
|
|
|
|
defer m.mutex.Unlock() |
|
|
|
|
m.ma[reader] = struct{}{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m *readersMap) remove(reader reader) { |
|
|
|
|
m.mutex.Lock() |
|
|
|
|
defer m.mutex.Unlock() |
|
|
|
|
delete(m.ma, reader) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (m *readersMap) forwardFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { |
|
|
|
|
m.mutex.RLock() |
|
|
|
|
defer m.mutex.RUnlock() |
|
|
|
|
|
|
|
|
|
for c := range m.ma { |
|
|
|
|
c.OnFrame(trackID, streamType, payload) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type path struct { |
|
|
|
|
rtspAddress string |
|
|
|
|
readTimeout time.Duration |
|
|
|
|
writeTimeout time.Duration |
|
|
|
@ -71,15 +100,15 @@ type Path struct {
@@ -71,15 +100,15 @@ type Path struct {
|
|
|
|
|
conf *conf.PathConf |
|
|
|
|
name string |
|
|
|
|
wg *sync.WaitGroup |
|
|
|
|
stats *stats.Stats |
|
|
|
|
parent Parent |
|
|
|
|
stats *stats |
|
|
|
|
parent pathParent |
|
|
|
|
|
|
|
|
|
ctx context.Context |
|
|
|
|
ctxCancel func() |
|
|
|
|
readPublishers map[readpublisher.ReadPublisher]readPublisherState |
|
|
|
|
describeRequests []readpublisher.DescribeReq |
|
|
|
|
setupPlayRequests []readpublisher.SetupPlayReq |
|
|
|
|
source source.Source |
|
|
|
|
readPublishers map[readPublisher]readPublisherState |
|
|
|
|
describeRequests []readPublisherDescribeReq |
|
|
|
|
setupPlayRequests []readPublisherSetupPlayReq |
|
|
|
|
source source |
|
|
|
|
sourceStream *gortsplib.ServerStream |
|
|
|
|
nonRTSPReaders *readersMap |
|
|
|
|
onDemandCmd *externalcmd.Cmd |
|
|
|
@ -94,20 +123,19 @@ type Path struct {
@@ -94,20 +123,19 @@ type Path struct {
|
|
|
|
|
closeTimerStarted bool |
|
|
|
|
|
|
|
|
|
// in
|
|
|
|
|
extSourceSetReady chan source.ExtSetReadyReq |
|
|
|
|
extSourceSetNotReady chan source.ExtSetNotReadyReq |
|
|
|
|
describeReq chan readpublisher.DescribeReq |
|
|
|
|
setupPlayReq chan readpublisher.SetupPlayReq |
|
|
|
|
announceReq chan readpublisher.AnnounceReq |
|
|
|
|
playReq chan readpublisher.PlayReq |
|
|
|
|
recordReq chan readpublisher.RecordReq |
|
|
|
|
pauseReq chan readpublisher.PauseReq |
|
|
|
|
removeReq chan readpublisher.RemoveReq |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// New allocates a Path.
|
|
|
|
|
func New( |
|
|
|
|
ctxParent context.Context, |
|
|
|
|
extSourceSetReady chan sourceExtSetReadyReq |
|
|
|
|
extSourceSetNotReady chan sourceExtSetNotReadyReq |
|
|
|
|
describeReq chan readPublisherDescribeReq |
|
|
|
|
setupPlayReq chan readPublisherSetupPlayReq |
|
|
|
|
announceReq chan readPublisherAnnounceReq |
|
|
|
|
playReq chan readPublisherPlayReq |
|
|
|
|
recordReq chan readPublisherRecordReq |
|
|
|
|
pauseReq chan readPublisherPauseReq |
|
|
|
|
removeReq chan readPublisherRemoveReq |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newPath( |
|
|
|
|
parentCtx context.Context, |
|
|
|
|
rtspAddress string, |
|
|
|
|
readTimeout time.Duration, |
|
|
|
|
writeTimeout time.Duration, |
|
|
|
@ -117,11 +145,11 @@ func New(
@@ -117,11 +145,11 @@ func New(
|
|
|
|
|
conf *conf.PathConf, |
|
|
|
|
name string, |
|
|
|
|
wg *sync.WaitGroup, |
|
|
|
|
stats *stats.Stats, |
|
|
|
|
parent Parent) *Path { |
|
|
|
|
ctx, ctxCancel := context.WithCancel(ctxParent) |
|
|
|
|
stats *stats, |
|
|
|
|
parent pathParent) *path { |
|
|
|
|
ctx, ctxCancel := context.WithCancel(parentCtx) |
|
|
|
|
|
|
|
|
|
pa := &Path{ |
|
|
|
|
pa := &path{ |
|
|
|
|
rtspAddress: rtspAddress, |
|
|
|
|
readTimeout: readTimeout, |
|
|
|
|
writeTimeout: writeTimeout, |
|
|
|
@ -135,21 +163,21 @@ func New(
@@ -135,21 +163,21 @@ func New(
|
|
|
|
|
parent: parent, |
|
|
|
|
ctx: ctx, |
|
|
|
|
ctxCancel: ctxCancel, |
|
|
|
|
readPublishers: make(map[readpublisher.ReadPublisher]readPublisherState), |
|
|
|
|
readPublishers: make(map[readPublisher]readPublisherState), |
|
|
|
|
nonRTSPReaders: newReadersMap(), |
|
|
|
|
describeTimer: newEmptyTimer(), |
|
|
|
|
sourceCloseTimer: newEmptyTimer(), |
|
|
|
|
runOnDemandCloseTimer: newEmptyTimer(), |
|
|
|
|
closeTimer: newEmptyTimer(), |
|
|
|
|
extSourceSetReady: make(chan source.ExtSetReadyReq), |
|
|
|
|
extSourceSetNotReady: make(chan source.ExtSetNotReadyReq), |
|
|
|
|
describeReq: make(chan readpublisher.DescribeReq), |
|
|
|
|
setupPlayReq: make(chan readpublisher.SetupPlayReq), |
|
|
|
|
announceReq: make(chan readpublisher.AnnounceReq), |
|
|
|
|
playReq: make(chan readpublisher.PlayReq), |
|
|
|
|
recordReq: make(chan readpublisher.RecordReq), |
|
|
|
|
pauseReq: make(chan readpublisher.PauseReq), |
|
|
|
|
removeReq: make(chan readpublisher.RemoveReq), |
|
|
|
|
extSourceSetReady: make(chan sourceExtSetReadyReq), |
|
|
|
|
extSourceSetNotReady: make(chan sourceExtSetNotReadyReq), |
|
|
|
|
describeReq: make(chan readPublisherDescribeReq), |
|
|
|
|
setupPlayReq: make(chan readPublisherSetupPlayReq), |
|
|
|
|
announceReq: make(chan readPublisherAnnounceReq), |
|
|
|
|
playReq: make(chan readPublisherPlayReq), |
|
|
|
|
recordReq: make(chan readPublisherRecordReq), |
|
|
|
|
pauseReq: make(chan readPublisherPauseReq), |
|
|
|
|
removeReq: make(chan readPublisherRemoveReq), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pa.wg.Add(1) |
|
|
|
@ -157,17 +185,16 @@ func New(
@@ -157,17 +185,16 @@ func New(
|
|
|
|
|
return pa |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Close closes a path.
|
|
|
|
|
func (pa *Path) Close() { |
|
|
|
|
func (pa *path) Close() { |
|
|
|
|
pa.ctxCancel() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Log is the main logging function.
|
|
|
|
|
func (pa *Path) Log(level logger.Level, format string, args ...interface{}) { |
|
|
|
|
func (pa *path) Log(level logger.Level, format string, args ...interface{}) { |
|
|
|
|
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) run() { |
|
|
|
|
func (pa *path) run() { |
|
|
|
|
defer pa.wg.Done() |
|
|
|
|
|
|
|
|
|
if pa.conf.Source == "redirect" { |
|
|
|
@ -191,12 +218,12 @@ outer:
@@ -191,12 +218,12 @@ outer:
|
|
|
|
|
select { |
|
|
|
|
case <-pa.describeTimer.C: |
|
|
|
|
for _, req := range pa.describeRequests { |
|
|
|
|
req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)} |
|
|
|
|
req.Res <- readPublisherDescribeRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)} |
|
|
|
|
} |
|
|
|
|
pa.describeRequests = nil |
|
|
|
|
|
|
|
|
|
for _, req := range pa.setupPlayRequests { |
|
|
|
|
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)} |
|
|
|
|
req.Res <- readPublisherSetupPlayRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)} |
|
|
|
|
} |
|
|
|
|
pa.setupPlayRequests = nil |
|
|
|
|
|
|
|
|
@ -209,7 +236,7 @@ outer:
@@ -209,7 +236,7 @@ outer:
|
|
|
|
|
|
|
|
|
|
case <-pa.sourceCloseTimer.C: |
|
|
|
|
pa.sourceCloseTimerStarted = false |
|
|
|
|
pa.source.(source.ExtSource).Close() |
|
|
|
|
pa.source.(sourceExternal).Close() |
|
|
|
|
pa.source = nil |
|
|
|
|
|
|
|
|
|
pa.scheduleClose() |
|
|
|
@ -228,7 +255,7 @@ outer:
@@ -228,7 +255,7 @@ outer:
|
|
|
|
|
case req := <-pa.extSourceSetReady: |
|
|
|
|
pa.sourceStream = gortsplib.NewServerStream(req.Tracks) |
|
|
|
|
pa.onSourceSetReady() |
|
|
|
|
req.Res <- source.ExtSetReadyRes{} |
|
|
|
|
req.Res <- sourceExtSetReadyRes{} |
|
|
|
|
|
|
|
|
|
case req := <-pa.extSourceSetNotReady: |
|
|
|
|
pa.onSourceSetNotReady() |
|
|
|
@ -282,7 +309,7 @@ outer:
@@ -282,7 +309,7 @@ outer:
|
|
|
|
|
onInitCmd.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if source, ok := pa.source.(source.ExtSource); ok { |
|
|
|
|
if source, ok := pa.source.(sourceExternal); ok { |
|
|
|
|
source.Close() |
|
|
|
|
} |
|
|
|
|
pa.sourceWg.Wait() |
|
|
|
@ -293,11 +320,11 @@ outer:
@@ -293,11 +320,11 @@ outer:
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, req := range pa.describeRequests { |
|
|
|
|
req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
req.Res <- readPublisherDescribeRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, req := range pa.setupPlayRequests { |
|
|
|
|
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
req.Res <- readPublisherSetupPlayRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for rp, state := range pa.readPublishers { |
|
|
|
@ -306,7 +333,7 @@ outer:
@@ -306,7 +333,7 @@ outer:
|
|
|
|
|
case readPublisherStatePlay: |
|
|
|
|
atomic.AddInt64(pa.stats.CountReaders, -1) |
|
|
|
|
|
|
|
|
|
if _, ok := rp.(rtspSession); !ok { |
|
|
|
|
if _, ok := rp.(pathRTSPSession); !ok { |
|
|
|
|
pa.nonRTSPReaders.remove(rp) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -320,16 +347,16 @@ outer:
@@ -320,16 +347,16 @@ outer:
|
|
|
|
|
pa.parent.OnPathClose(pa) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) hasExternalSource() bool { |
|
|
|
|
func (pa *path) hasExternalSource() bool { |
|
|
|
|
return strings.HasPrefix(pa.conf.Source, "rtsp://") || |
|
|
|
|
strings.HasPrefix(pa.conf.Source, "rtsps://") || |
|
|
|
|
strings.HasPrefix(pa.conf.Source, "rtmp://") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) startExternalSource() { |
|
|
|
|
func (pa *path) startExternalSource() { |
|
|
|
|
if strings.HasPrefix(pa.conf.Source, "rtsp://") || |
|
|
|
|
strings.HasPrefix(pa.conf.Source, "rtsps://") { |
|
|
|
|
pa.source = rtspsource.New( |
|
|
|
|
pa.source = newRTSPSource( |
|
|
|
|
pa.ctx, |
|
|
|
|
pa.conf.Source, |
|
|
|
|
pa.conf.SourceProtocolParsed, |
|
|
|
@ -343,7 +370,7 @@ func (pa *Path) startExternalSource() {
@@ -343,7 +370,7 @@ func (pa *Path) startExternalSource() {
|
|
|
|
|
pa.stats, |
|
|
|
|
pa) |
|
|
|
|
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") { |
|
|
|
|
pa.source = rtmpsource.New( |
|
|
|
|
pa.source = newRTMPSource( |
|
|
|
|
pa.ctx, |
|
|
|
|
pa.conf.Source, |
|
|
|
|
pa.readTimeout, |
|
|
|
@ -354,7 +381,7 @@ func (pa *Path) startExternalSource() {
@@ -354,7 +381,7 @@ func (pa *Path) startExternalSource() {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) hasReadPublishers() bool { |
|
|
|
|
func (pa *path) hasReadPublishers() bool { |
|
|
|
|
for _, state := range pa.readPublishers { |
|
|
|
|
if state != readPublisherStatePreRemove { |
|
|
|
|
return true |
|
|
|
@ -363,7 +390,7 @@ func (pa *Path) hasReadPublishers() bool {
@@ -363,7 +390,7 @@ func (pa *Path) hasReadPublishers() bool {
|
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) hasReadPublishersNotSources() bool { |
|
|
|
|
func (pa *path) hasReadPublishersNotSources() bool { |
|
|
|
|
for c, state := range pa.readPublishers { |
|
|
|
|
if state != readPublisherStatePreRemove && c != pa.source { |
|
|
|
|
return true |
|
|
|
@ -372,11 +399,11 @@ func (pa *Path) hasReadPublishersNotSources() bool {
@@ -372,11 +399,11 @@ func (pa *Path) hasReadPublishersNotSources() bool {
|
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) addReadPublisher(c readpublisher.ReadPublisher, state readPublisherState) { |
|
|
|
|
func (pa *path) addReadPublisher(c readPublisher, state readPublisherState) { |
|
|
|
|
pa.readPublishers[c] = state |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) { |
|
|
|
|
func (pa *path) removeReadPublisher(rp readPublisher) { |
|
|
|
|
state := pa.readPublishers[rp] |
|
|
|
|
pa.readPublishers[rp] = readPublisherStatePreRemove |
|
|
|
|
|
|
|
|
@ -384,7 +411,7 @@ func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) {
@@ -384,7 +411,7 @@ func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) {
|
|
|
|
|
case readPublisherStatePlay: |
|
|
|
|
atomic.AddInt64(pa.stats.CountReaders, -1) |
|
|
|
|
|
|
|
|
|
if _, ok := rp.(rtspSession); !ok { |
|
|
|
|
if _, ok := rp.(pathRTSPSession); !ok { |
|
|
|
|
pa.nonRTSPReaders.remove(rp) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -412,7 +439,7 @@ func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) {
@@ -412,7 +439,7 @@ func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) {
|
|
|
|
|
pa.scheduleClose() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onSourceSetReady() { |
|
|
|
|
func (pa *path) onSourceSetReady() { |
|
|
|
|
if pa.sourceState == sourceStateWaitingDescribe { |
|
|
|
|
pa.describeTimer.Stop() |
|
|
|
|
pa.describeTimer = newEmptyTimer() |
|
|
|
@ -421,7 +448,7 @@ func (pa *Path) onSourceSetReady() {
@@ -421,7 +448,7 @@ func (pa *Path) onSourceSetReady() {
|
|
|
|
|
pa.sourceState = sourceStateReady |
|
|
|
|
|
|
|
|
|
for _, req := range pa.describeRequests { |
|
|
|
|
req.Res <- readpublisher.DescribeRes{ |
|
|
|
|
req.Res <- readPublisherDescribeRes{ |
|
|
|
|
Stream: pa.sourceStream, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -437,7 +464,7 @@ func (pa *Path) onSourceSetReady() {
@@ -437,7 +464,7 @@ func (pa *Path) onSourceSetReady() {
|
|
|
|
|
pa.scheduleClose() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onSourceSetNotReady() { |
|
|
|
|
func (pa *path) onSourceSetNotReady() { |
|
|
|
|
pa.sourceState = sourceStateNotReady |
|
|
|
|
|
|
|
|
|
// close all readPublishers that are reading or waiting to read
|
|
|
|
@ -449,7 +476,7 @@ func (pa *Path) onSourceSetNotReady() {
@@ -449,7 +476,7 @@ func (pa *Path) onSourceSetNotReady() {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) fixedPublisherStart() { |
|
|
|
|
func (pa *path) fixedPublisherStart() { |
|
|
|
|
if pa.hasExternalSource() { |
|
|
|
|
// start on-demand source
|
|
|
|
|
if pa.source == nil { |
|
|
|
@ -490,12 +517,12 @@ func (pa *Path) fixedPublisherStart() {
@@ -490,12 +517,12 @@ func (pa *Path) fixedPublisherStart() {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) { |
|
|
|
|
func (pa *path) onReadPublisherDescribe(req readPublisherDescribeReq) { |
|
|
|
|
pa.fixedPublisherStart() |
|
|
|
|
pa.scheduleClose() |
|
|
|
|
|
|
|
|
|
if _, ok := pa.source.(*sourceRedirect); ok { |
|
|
|
|
req.Res <- readpublisher.DescribeRes{ |
|
|
|
|
req.Res <- readPublisherDescribeRes{ |
|
|
|
|
Redirect: pa.conf.SourceRedirect, |
|
|
|
|
} |
|
|
|
|
return |
|
|
|
@ -503,7 +530,7 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
@@ -503,7 +530,7 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
|
|
|
|
|
|
|
|
|
|
switch pa.sourceState { |
|
|
|
|
case sourceStateReady: |
|
|
|
|
req.Res <- readpublisher.DescribeRes{ |
|
|
|
|
req.Res <- readPublisherDescribeRes{ |
|
|
|
|
Stream: pa.sourceStream, |
|
|
|
|
} |
|
|
|
|
return |
|
|
|
@ -526,16 +553,16 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
@@ -526,16 +553,16 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
|
|
|
|
|
} |
|
|
|
|
return pa.conf.Fallback |
|
|
|
|
}() |
|
|
|
|
req.Res <- readpublisher.DescribeRes{nil, fallbackURL, nil} //nolint:govet
|
|
|
|
|
req.Res <- readPublisherDescribeRes{nil, fallbackURL, nil} //nolint:govet
|
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
req.Res <- readpublisher.DescribeRes{Err: readpublisher.ErrNoOnePublishing{PathName: pa.name}} |
|
|
|
|
req.Res <- readPublisherDescribeRes{Err: readPublisherErrNoOnePublishing{PathName: pa.name}} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) { |
|
|
|
|
func (pa *path) onReadPublisherSetupPlay(req readPublisherSetupPlayReq) { |
|
|
|
|
pa.fixedPublisherStart() |
|
|
|
|
pa.scheduleClose() |
|
|
|
|
|
|
|
|
@ -549,12 +576,12 @@ func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
@@ -549,12 +576,12 @@ func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
case sourceStateNotReady: |
|
|
|
|
req.Res <- readpublisher.SetupPlayRes{Err: readpublisher.ErrNoOnePublishing{PathName: pa.name}} |
|
|
|
|
req.Res <- readPublisherSetupPlayRes{Err: readPublisherErrNoOnePublishing{PathName: pa.name}} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) { |
|
|
|
|
func (pa *path) onReadPublisherSetupPlayPost(req readPublisherSetupPlayReq) { |
|
|
|
|
if _, ok := pa.readPublishers[req.Author]; !ok { |
|
|
|
|
// prevent on-demand source from closing
|
|
|
|
|
if pa.sourceCloseTimerStarted { |
|
|
|
@ -571,42 +598,42 @@ func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) {
@@ -571,42 +598,42 @@ func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) {
|
|
|
|
|
pa.addReadPublisher(req.Author, readPublisherStatePrePlay) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
req.Res <- readpublisher.SetupPlayRes{ |
|
|
|
|
req.Res <- readPublisherSetupPlayRes{ |
|
|
|
|
Path: pa, |
|
|
|
|
Stream: pa.sourceStream, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) { |
|
|
|
|
func (pa *path) onReadPublisherPlay(req readPublisherPlayReq) { |
|
|
|
|
atomic.AddInt64(pa.stats.CountReaders, 1) |
|
|
|
|
pa.readPublishers[req.Author] = readPublisherStatePlay |
|
|
|
|
|
|
|
|
|
if _, ok := req.Author.(rtspSession); !ok { |
|
|
|
|
if _, ok := req.Author.(pathRTSPSession); !ok { |
|
|
|
|
pa.nonRTSPReaders.add(req.Author) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
req.Res <- readpublisher.PlayRes{} |
|
|
|
|
req.Res <- readPublisherPlayRes{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) { |
|
|
|
|
func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) { |
|
|
|
|
if _, ok := pa.readPublishers[req.Author]; ok { |
|
|
|
|
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("already publishing or reading")} |
|
|
|
|
req.Res <- readPublisherAnnounceRes{Err: fmt.Errorf("already publishing or reading")} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if pa.hasExternalSource() { |
|
|
|
|
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("path '%s' is assigned to an external source", pa.name)} |
|
|
|
|
req.Res <- readPublisherAnnounceRes{Err: fmt.Errorf("path '%s' is assigned to an external source", pa.name)} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if pa.source != nil { |
|
|
|
|
if pa.conf.DisablePublisherOverride { |
|
|
|
|
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("another client is already publishing on path '%s'", pa.name)} |
|
|
|
|
req.Res <- readPublisherAnnounceRes{Err: fmt.Errorf("another client is already publishing on path '%s'", pa.name)} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pa.Log(logger.Info, "closing existing publisher") |
|
|
|
|
curPublisher := pa.source.(readpublisher.ReadPublisher) |
|
|
|
|
curPublisher := pa.source.(readPublisher) |
|
|
|
|
pa.removeReadPublisher(curPublisher) |
|
|
|
|
curPublisher.Close() |
|
|
|
|
|
|
|
|
@ -622,12 +649,12 @@ func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
@@ -622,12 +649,12 @@ func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
|
|
|
|
|
|
|
|
|
|
pa.source = req.Author |
|
|
|
|
pa.sourceStream = gortsplib.NewServerStream(req.Tracks) |
|
|
|
|
req.Res <- readpublisher.AnnounceRes{pa, nil} //nolint:govet
|
|
|
|
|
req.Res <- readPublisherAnnounceRes{pa, nil} //nolint:govet
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) { |
|
|
|
|
func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) { |
|
|
|
|
if state, ok := pa.readPublishers[req.Author]; !ok || state != readPublisherStatePreRecord { |
|
|
|
|
req.Res <- readpublisher.RecordRes{Err: fmt.Errorf("not recording anymore")} |
|
|
|
|
req.Res <- readPublisherRecordRes{Err: fmt.Errorf("not recording anymore")} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -635,10 +662,10 @@ func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) {
@@ -635,10 +662,10 @@ func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) {
|
|
|
|
|
pa.readPublishers[req.Author] = readPublisherStateRecord |
|
|
|
|
pa.onSourceSetReady() |
|
|
|
|
|
|
|
|
|
req.Res <- readpublisher.RecordRes{} |
|
|
|
|
req.Res <- readPublisherRecordRes{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) { |
|
|
|
|
func (pa *path) onReadPublisherPause(req readPublisherPauseReq) { |
|
|
|
|
state, ok := pa.readPublishers[req.Author] |
|
|
|
|
if !ok { |
|
|
|
|
close(req.Res) |
|
|
|
@ -649,7 +676,7 @@ func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
@@ -649,7 +676,7 @@ func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
|
|
|
|
|
atomic.AddInt64(pa.stats.CountReaders, -1) |
|
|
|
|
pa.readPublishers[req.Author] = readPublisherStatePrePlay |
|
|
|
|
|
|
|
|
|
if _, ok := req.Author.(rtspSession); !ok { |
|
|
|
|
if _, ok := req.Author.(pathRTSPSession); !ok { |
|
|
|
|
pa.nonRTSPReaders.remove(req.Author) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -662,7 +689,7 @@ func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
@@ -662,7 +689,7 @@ func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
|
|
|
|
|
close(req.Res) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) scheduleSourceClose() { |
|
|
|
|
func (pa *path) scheduleSourceClose() { |
|
|
|
|
if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -678,7 +705,7 @@ func (pa *Path) scheduleSourceClose() {
@@ -678,7 +705,7 @@ func (pa *Path) scheduleSourceClose() {
|
|
|
|
|
pa.sourceCloseTimerStarted = true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) scheduleRunOnDemandClose() { |
|
|
|
|
func (pa *path) scheduleRunOnDemandClose() { |
|
|
|
|
if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -694,7 +721,7 @@ func (pa *Path) scheduleRunOnDemandClose() {
@@ -694,7 +721,7 @@ func (pa *Path) scheduleRunOnDemandClose() {
|
|
|
|
|
pa.runOnDemandCloseTimerStarted = true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *Path) scheduleClose() { |
|
|
|
|
func (pa *path) scheduleClose() { |
|
|
|
|
if pa.conf.Regexp != nil && |
|
|
|
|
!pa.hasReadPublishers() && |
|
|
|
|
pa.source == nil && |
|
|
|
@ -710,22 +737,22 @@ func (pa *Path) scheduleClose() {
@@ -710,22 +737,22 @@ func (pa *Path) scheduleClose() {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ConfName returns the configuration name of this path.
|
|
|
|
|
func (pa *Path) ConfName() string { |
|
|
|
|
func (pa *path) ConfName() string { |
|
|
|
|
return pa.confName |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Conf returns the configuration of this path.
|
|
|
|
|
func (pa *Path) Conf() *conf.PathConf { |
|
|
|
|
func (pa *path) Conf() *conf.PathConf { |
|
|
|
|
return pa.conf |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Name returns the name of this path.
|
|
|
|
|
func (pa *Path) Name() string { |
|
|
|
|
func (pa *path) Name() string { |
|
|
|
|
return pa.name |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnExtSourceSetReady is called by an external source.
|
|
|
|
|
func (pa *Path) OnExtSourceSetReady(req source.ExtSetReadyReq) { |
|
|
|
|
// OnsourceExternalSetReady is called by an external source.
|
|
|
|
|
func (pa *path) OnsourceExternalSetReady(req sourceExtSetReadyReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.extSourceSetReady <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
@ -733,8 +760,8 @@ func (pa *Path) OnExtSourceSetReady(req source.ExtSetReadyReq) {
@@ -733,8 +760,8 @@ func (pa *Path) OnExtSourceSetReady(req source.ExtSetReadyReq) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnExtSourceSetNotReady is called by an external source.
|
|
|
|
|
func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) { |
|
|
|
|
// OnsourceExternalSetNotReady is called by an external source.
|
|
|
|
|
func (pa *path) OnsourceExternalSetNotReady(req sourceExtSetNotReadyReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.extSourceSetNotReady <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
@ -743,34 +770,34 @@ func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) {
@@ -743,34 +770,34 @@ func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnPathManDescribe is called by pathman.PathMan.
|
|
|
|
|
func (pa *Path) OnPathManDescribe(req readpublisher.DescribeReq) { |
|
|
|
|
func (pa *path) OnPathManDescribe(req readPublisherDescribeReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.describeReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
|
req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
req.Res <- readPublisherDescribeRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnPathManSetupPlay is called by pathman.PathMan.
|
|
|
|
|
func (pa *Path) OnPathManSetupPlay(req readpublisher.SetupPlayReq) { |
|
|
|
|
func (pa *path) OnPathManSetupPlay(req readPublisherSetupPlayReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.setupPlayReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
|
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
req.Res <- readPublisherSetupPlayRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnPathManAnnounce is called by pathman.PathMan.
|
|
|
|
|
func (pa *Path) OnPathManAnnounce(req readpublisher.AnnounceReq) { |
|
|
|
|
func (pa *path) OnPathManAnnounce(req readPublisherAnnounceReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.announceReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
|
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
req.Res <- readPublisherAnnounceRes{Err: fmt.Errorf("terminated")} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnReadPublisherPlay is called by a readpublisher.
|
|
|
|
|
func (pa *Path) OnReadPublisherPlay(req readpublisher.PlayReq) { |
|
|
|
|
// OnReadPublisherPlay is called by a readPublisher
|
|
|
|
|
func (pa *path) OnReadPublisherPlay(req readPublisherPlayReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.playReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
@ -778,8 +805,8 @@ func (pa *Path) OnReadPublisherPlay(req readpublisher.PlayReq) {
@@ -778,8 +805,8 @@ func (pa *Path) OnReadPublisherPlay(req readpublisher.PlayReq) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnReadPublisherRecord is called by a readpublisher.
|
|
|
|
|
func (pa *Path) OnReadPublisherRecord(req readpublisher.RecordReq) { |
|
|
|
|
// OnReadPublisherRecord is called by a readPublisher
|
|
|
|
|
func (pa *path) OnReadPublisherRecord(req readPublisherRecordReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.recordReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
@ -787,8 +814,8 @@ func (pa *Path) OnReadPublisherRecord(req readpublisher.RecordReq) {
@@ -787,8 +814,8 @@ func (pa *Path) OnReadPublisherRecord(req readpublisher.RecordReq) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnReadPublisherPause is called by a readpublisher.
|
|
|
|
|
func (pa *Path) OnReadPublisherPause(req readpublisher.PauseReq) { |
|
|
|
|
// OnReadPublisherPause is called by a readPublisher
|
|
|
|
|
func (pa *path) OnReadPublisherPause(req readPublisherPauseReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.pauseReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
@ -796,8 +823,8 @@ func (pa *Path) OnReadPublisherPause(req readpublisher.PauseReq) {
@@ -796,8 +823,8 @@ func (pa *Path) OnReadPublisherPause(req readpublisher.PauseReq) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnReadPublisherRemove is called by a readpublisher.
|
|
|
|
|
func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) { |
|
|
|
|
// OnReadPublisherRemove is called by a readPublisher
|
|
|
|
|
func (pa *path) OnReadPublisherRemove(req readPublisherRemoveReq) { |
|
|
|
|
select { |
|
|
|
|
case pa.removeReq <- req: |
|
|
|
|
case <-pa.ctx.Done(): |
|
|
|
@ -806,7 +833,7 @@ func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) {
@@ -806,7 +833,7 @@ func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// OnFrame is called by a readpublisher
|
|
|
|
|
func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { |
|
|
|
|
func (pa *path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { |
|
|
|
|
pa.sourceStream.WriteFrame(trackID, streamType, payload) |
|
|
|
|
|
|
|
|
|
pa.nonRTSPReaders.forwardFrame(trackID, streamType, payload) |