diff --git a/internal/core/path.go b/internal/core/path.go index 21b92cc2..0fe63f41 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -275,7 +275,6 @@ func newPath( func (pa *path) close() { pa.ctxCancel() - pa.log(logger.Info, "closed") } // Log is the main logging function. @@ -317,113 +316,106 @@ func (pa *path) run() { }) } -outer: - for { - select { - case <-pa.onDemandReadyTimer.C: - for _, req := range pa.describeRequests { - req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.describeRequests = nil - - for _, req := range pa.setupPlayRequests { - req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.setupPlayRequests = nil + err := func() error { + for { + select { + case <-pa.onDemandReadyTimer.C: + for _, req := range pa.describeRequests { + req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.describeRequests = nil - pa.onDemandCloseSource() + for _, req := range pa.setupPlayRequests { + req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.setupPlayRequests = nil - if pa.conf.Regexp != nil { - break outer - } + pa.onDemandCloseSource() - case <-pa.onDemandCloseTimer.C: - pa.onDemandCloseSource() + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - if pa.conf.Regexp != nil { - break outer - } + case <-pa.onDemandCloseTimer.C: + pa.onDemandCloseSource() - case req := <-pa.sourceStaticSetReady: - if req.Source == pa.source { - pa.sourceSetReady(req.Tracks) - req.Res <- pathSourceStaticSetReadyRes{Stream: pa.stream} - } else { - req.Res <- pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")} - } + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - case req := <-pa.sourceStaticSetNotReady: - if req.Source == pa.source { - if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { - pa.onDemandCloseSource() + case req := <-pa.sourceStaticSetReady: + if req.Source == pa.source { + pa.sourceSetReady(req.Tracks) + req.Res <- pathSourceStaticSetReadyRes{Stream: pa.stream} } else { - pa.sourceSetNotReady() + req.Res <- pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")} } - } - close(req.Res) - if pa.source == nil && pa.conf.Regexp != nil { - break outer - } + case req := <-pa.sourceStaticSetNotReady: + if req.Source == pa.source { + if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { + pa.onDemandCloseSource() + } else { + pa.sourceSetNotReady() + } + } + close(req.Res) - case req := <-pa.describe: - pa.handleDescribe(req) + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - if pa.conf.Regexp != nil && - pa.source == nil && - len(pa.readers) == 0 && - len(pa.describeRequests) == 0 && - len(pa.setupPlayRequests) == 0 { - break outer - } + case req := <-pa.describe: + pa.handleDescribe(req) - case req := <-pa.publisherRemove: - pa.handlePublisherRemove(req) + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - if pa.source == nil && pa.conf.Regexp != nil { - break outer - } + case req := <-pa.publisherRemove: + pa.handlePublisherRemove(req) - case req := <-pa.publisherAnnounce: - pa.handlePublisherAnnounce(req) + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - case req := <-pa.publisherRecord: - pa.handlePublisherRecord(req) + case req := <-pa.publisherAnnounce: + pa.handlePublisherAnnounce(req) - case req := <-pa.publisherPause: - pa.handlePublisherPause(req) + case req := <-pa.publisherRecord: + pa.handlePublisherRecord(req) - if pa.source == nil && pa.conf.Regexp != nil { - break outer - } + case req := <-pa.publisherPause: + pa.handlePublisherPause(req) - case req := <-pa.readerRemove: - pa.handleReaderRemove(req) + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - case req := <-pa.readerSetupPlay: - pa.handleReaderSetupPlay(req) + case req := <-pa.readerRemove: + pa.handleReaderRemove(req) - if pa.conf.Regexp != nil && - pa.source == nil && - len(pa.readers) == 0 && - len(pa.describeRequests) == 0 && - len(pa.setupPlayRequests) == 0 { - break outer - } + case req := <-pa.readerSetupPlay: + pa.handleReaderSetupPlay(req) - case req := <-pa.readerPlay: - pa.handleReaderPlay(req) + if pa.shouldClose() { + return fmt.Errorf("not in use") + } - case req := <-pa.readerPause: - pa.handleReaderPause(req) + case req := <-pa.readerPlay: + pa.handleReaderPlay(req) - case req := <-pa.apiPathsList: - pa.handleAPIPathsList(req) + case req := <-pa.readerPause: + pa.handleReaderPause(req) - case <-pa.ctx.Done(): - break outer + case req := <-pa.apiPathsList: + pa.handleAPIPathsList(req) + + case <-pa.ctx.Done(): + return fmt.Errorf("terminated") + } } - } + }() pa.ctxCancel() @@ -470,9 +462,19 @@ outer: pa.log(logger.Info, "runOnDemand command stopped") } + pa.log(logger.Info, "closed (%v)", err) + pa.parent.onPathClose(pa) } +func (pa *path) shouldClose() bool { + return pa.conf.Regexp != nil && + pa.source == nil && + len(pa.readers) == 0 && + len(pa.describeRequests) == 0 && + len(pa.setupPlayRequests) == 0 +} + func (pa *path) hasStaticSource() bool { return strings.HasPrefix(pa.conf.Source, "rtsp://") || strings.HasPrefix(pa.conf.Source, "rtsps://") ||