Browse Source

log the exact reason why a path is closed

pull/707/head
aler9 4 years ago
parent
commit
f801a9fa39
  1. 168
      internal/core/path.go

168
internal/core/path.go

@ -275,7 +275,6 @@ func newPath(
func (pa *path) close() { func (pa *path) close() {
pa.ctxCancel() pa.ctxCancel()
pa.log(logger.Info, "closed")
} }
// Log is the main logging function. // Log is the main logging function.
@ -317,113 +316,106 @@ func (pa *path) run() {
}) })
} }
outer: err := func() error {
for { for {
select { select {
case <-pa.onDemandReadyTimer.C: case <-pa.onDemandReadyTimer.C:
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)} req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
} }
pa.describeRequests = nil pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequests = nil
pa.onDemandCloseSource() for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequests = nil
if pa.conf.Regexp != nil { pa.onDemandCloseSource()
break outer
}
case <-pa.onDemandCloseTimer.C: if pa.shouldClose() {
pa.onDemandCloseSource() return fmt.Errorf("not in use")
}
if pa.conf.Regexp != nil { case <-pa.onDemandCloseTimer.C:
break outer pa.onDemandCloseSource()
}
case req := <-pa.sourceStaticSetReady: if pa.shouldClose() {
if req.Source == pa.source { return fmt.Errorf("not in use")
pa.sourceSetReady(req.Tracks) }
req.Res <- pathSourceStaticSetReadyRes{Stream: pa.stream}
} else {
req.Res <- pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
}
case req := <-pa.sourceStaticSetNotReady: case req := <-pa.sourceStaticSetReady:
if req.Source == pa.source { if req.Source == pa.source {
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { pa.sourceSetReady(req.Tracks)
pa.onDemandCloseSource() req.Res <- pathSourceStaticSetReadyRes{Stream: pa.stream}
} else { } else {
pa.sourceSetNotReady() req.Res <- pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
} }
}
close(req.Res)
if pa.source == nil && pa.conf.Regexp != nil { case req := <-pa.sourceStaticSetNotReady:
break outer if req.Source == pa.source {
} if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
} else {
pa.sourceSetNotReady()
}
}
close(req.Res)
case req := <-pa.describe: if pa.shouldClose() {
pa.handleDescribe(req) return fmt.Errorf("not in use")
}
if pa.conf.Regexp != nil && case req := <-pa.describe:
pa.source == nil && pa.handleDescribe(req)
len(pa.readers) == 0 &&
len(pa.describeRequests) == 0 &&
len(pa.setupPlayRequests) == 0 {
break outer
}
case req := <-pa.publisherRemove: if pa.shouldClose() {
pa.handlePublisherRemove(req) return fmt.Errorf("not in use")
}
if pa.source == nil && pa.conf.Regexp != nil { case req := <-pa.publisherRemove:
break outer pa.handlePublisherRemove(req)
}
case req := <-pa.publisherAnnounce: if pa.shouldClose() {
pa.handlePublisherAnnounce(req) return fmt.Errorf("not in use")
}
case req := <-pa.publisherRecord: case req := <-pa.publisherAnnounce:
pa.handlePublisherRecord(req) pa.handlePublisherAnnounce(req)
case req := <-pa.publisherPause: case req := <-pa.publisherRecord:
pa.handlePublisherPause(req) pa.handlePublisherRecord(req)
if pa.source == nil && pa.conf.Regexp != nil { case req := <-pa.publisherPause:
break outer pa.handlePublisherPause(req)
}
case req := <-pa.readerRemove: if pa.shouldClose() {
pa.handleReaderRemove(req) return fmt.Errorf("not in use")
}
case req := <-pa.readerSetupPlay: case req := <-pa.readerRemove:
pa.handleReaderSetupPlay(req) pa.handleReaderRemove(req)
if pa.conf.Regexp != nil && case req := <-pa.readerSetupPlay:
pa.source == nil && pa.handleReaderSetupPlay(req)
len(pa.readers) == 0 &&
len(pa.describeRequests) == 0 &&
len(pa.setupPlayRequests) == 0 {
break outer
}
case req := <-pa.readerPlay: if pa.shouldClose() {
pa.handleReaderPlay(req) return fmt.Errorf("not in use")
}
case req := <-pa.readerPause: case req := <-pa.readerPlay:
pa.handleReaderPause(req) pa.handleReaderPlay(req)
case req := <-pa.apiPathsList: case req := <-pa.readerPause:
pa.handleAPIPathsList(req) pa.handleReaderPause(req)
case <-pa.ctx.Done(): case req := <-pa.apiPathsList:
break outer pa.handleAPIPathsList(req)
case <-pa.ctx.Done():
return fmt.Errorf("terminated")
}
} }
} }()
pa.ctxCancel() pa.ctxCancel()
@ -470,9 +462,19 @@ outer:
pa.log(logger.Info, "runOnDemand command stopped") pa.log(logger.Info, "runOnDemand command stopped")
} }
pa.log(logger.Info, "closed (%v)", err)
pa.parent.onPathClose(pa) pa.parent.onPathClose(pa)
} }
func (pa *path) shouldClose() bool {
return pa.conf.Regexp != nil &&
pa.source == nil &&
len(pa.readers) == 0 &&
len(pa.describeRequests) == 0 &&
len(pa.setupPlayRequests) == 0
}
func (pa *path) hasStaticSource() bool { func (pa *path) hasStaticSource() bool {
return strings.HasPrefix(pa.conf.Source, "rtsp://") || return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") || strings.HasPrefix(pa.conf.Source, "rtsps://") ||

Loading…
Cancel
Save