|
|
|
@ -314,7 +314,7 @@ func (pa *path) run() {
@@ -314,7 +314,7 @@ func (pa *path) run() {
|
|
|
|
|
pa) |
|
|
|
|
|
|
|
|
|
if !pa.conf.SourceOnDemand { |
|
|
|
|
pa.source.(*sourceStatic).start() |
|
|
|
|
pa.source.(*sourceStatic).start(false) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -362,7 +362,9 @@ func (pa *path) run() {
@@ -362,7 +362,9 @@ func (pa *path) run() {
|
|
|
|
|
|
|
|
|
|
if pa.source != nil { |
|
|
|
|
if source, ok := pa.source.(*sourceStatic); ok { |
|
|
|
|
source.close() |
|
|
|
|
if !pa.conf.SourceOnDemand || pa.onDemandStaticSourceState != pathOnDemandStateInitial { |
|
|
|
|
source.close("path is closing") |
|
|
|
|
} |
|
|
|
|
} else if source, ok := pa.source.(publisher); ok { |
|
|
|
|
source.close() |
|
|
|
|
} |
|
|
|
@ -373,7 +375,7 @@ func (pa *path) run() {
@@ -373,7 +375,7 @@ func (pa *path) run() {
|
|
|
|
|
pa.Log(logger.Info, "runOnDemand command stopped") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pa.Log(logger.Debug, "destroyed (%v)", err) |
|
|
|
|
pa.Log(logger.Debug, "destroyed: %v", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) runInner() error { |
|
|
|
@ -477,12 +479,12 @@ func (pa *path) doOnDemandStaticSourceReadyTimer() {
@@ -477,12 +479,12 @@ func (pa *path) doOnDemandStaticSourceReadyTimer() {
|
|
|
|
|
} |
|
|
|
|
pa.readerAddRequestsOnHold = nil |
|
|
|
|
|
|
|
|
|
pa.onDemandStaticSourceStop() |
|
|
|
|
pa.onDemandStaticSourceStop("timed out") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) doOnDemandStaticSourceCloseTimer() { |
|
|
|
|
pa.setNotReady() |
|
|
|
|
pa.onDemandStaticSourceStop() |
|
|
|
|
pa.onDemandStaticSourceStop("not needed by anyone") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) doOnDemandPublisherReadyTimer() { |
|
|
|
@ -496,11 +498,11 @@ func (pa *path) doOnDemandPublisherReadyTimer() {
@@ -496,11 +498,11 @@ func (pa *path) doOnDemandPublisherReadyTimer() {
|
|
|
|
|
} |
|
|
|
|
pa.readerAddRequestsOnHold = nil |
|
|
|
|
|
|
|
|
|
pa.onDemandStopPublisher() |
|
|
|
|
pa.onDemandPublisherStop("timed out") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) doOnDemandPublisherCloseTimer() { |
|
|
|
|
pa.onDemandStopPublisher() |
|
|
|
|
pa.onDemandPublisherStop("not needed by anyone") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) doReloadConf(newConf *conf.PathConf) { |
|
|
|
@ -550,7 +552,7 @@ func (pa *path) doSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
@@ -550,7 +552,7 @@ func (pa *path) doSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
|
|
|
|
|
close(req.res) |
|
|
|
|
|
|
|
|
|
if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { |
|
|
|
|
pa.onDemandStaticSourceStop() |
|
|
|
|
pa.onDemandStaticSourceStop("an error occurred") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -579,7 +581,7 @@ func (pa *path) doDescribe(req pathDescribeReq) {
@@ -579,7 +581,7 @@ func (pa *path) doDescribe(req pathDescribeReq) {
|
|
|
|
|
|
|
|
|
|
if pa.conf.HasOnDemandPublisher() { |
|
|
|
|
if pa.onDemandPublisherState == pathOnDemandStateInitial { |
|
|
|
|
pa.onDemandStartPublisher() |
|
|
|
|
pa.onDemandPublisherStart() |
|
|
|
|
} |
|
|
|
|
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) |
|
|
|
|
return |
|
|
|
@ -697,7 +699,7 @@ func (pa *path) doAddReader(req pathAddReaderReq) {
@@ -697,7 +699,7 @@ func (pa *path) doAddReader(req pathAddReaderReq) {
|
|
|
|
|
|
|
|
|
|
if pa.conf.HasOnDemandPublisher() { |
|
|
|
|
if pa.onDemandPublisherState == pathOnDemandStateInitial { |
|
|
|
|
pa.onDemandStartPublisher() |
|
|
|
|
pa.onDemandPublisherStart() |
|
|
|
|
} |
|
|
|
|
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req) |
|
|
|
|
return |
|
|
|
@ -790,7 +792,7 @@ func (pa *path) externalCmdEnv() externalcmd.Environment {
@@ -790,7 +792,7 @@ func (pa *path) externalCmdEnv() externalcmd.Environment {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) onDemandStaticSourceStart() { |
|
|
|
|
pa.source.(*sourceStatic).start() |
|
|
|
|
pa.source.(*sourceStatic).start(true) |
|
|
|
|
|
|
|
|
|
pa.onDemandStaticSourceReadyTimer.Stop() |
|
|
|
|
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) |
|
|
|
@ -805,7 +807,7 @@ func (pa *path) onDemandStaticSourceScheduleClose() {
@@ -805,7 +807,7 @@ func (pa *path) onDemandStaticSourceScheduleClose() {
|
|
|
|
|
pa.onDemandStaticSourceState = pathOnDemandStateClosing |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) onDemandStaticSourceStop() { |
|
|
|
|
func (pa *path) onDemandStaticSourceStop(reason string) { |
|
|
|
|
if pa.onDemandStaticSourceState == pathOnDemandStateClosing { |
|
|
|
|
pa.onDemandStaticSourceCloseTimer.Stop() |
|
|
|
|
pa.onDemandStaticSourceCloseTimer = newEmptyTimer() |
|
|
|
@ -813,10 +815,10 @@ func (pa *path) onDemandStaticSourceStop() {
@@ -813,10 +815,10 @@ func (pa *path) onDemandStaticSourceStop() {
|
|
|
|
|
|
|
|
|
|
pa.onDemandStaticSourceState = pathOnDemandStateInitial |
|
|
|
|
|
|
|
|
|
pa.source.(*sourceStatic).stop() |
|
|
|
|
pa.source.(*sourceStatic).stop(reason) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) onDemandStartPublisher() { |
|
|
|
|
func (pa *path) onDemandPublisherStart() { |
|
|
|
|
pa.Log(logger.Info, "runOnDemand command started") |
|
|
|
|
pa.onDemandCmd = externalcmd.NewCmd( |
|
|
|
|
pa.externalCmdPool, |
|
|
|
@ -840,7 +842,7 @@ func (pa *path) onDemandPublisherScheduleClose() {
@@ -840,7 +842,7 @@ func (pa *path) onDemandPublisherScheduleClose() {
|
|
|
|
|
pa.onDemandPublisherState = pathOnDemandStateClosing |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (pa *path) onDemandStopPublisher() { |
|
|
|
|
func (pa *path) onDemandPublisherStop(reason string) { |
|
|
|
|
if pa.source != nil { |
|
|
|
|
pa.source.(publisher).close() |
|
|
|
|
pa.executeRemovePublisher() |
|
|
|
@ -856,7 +858,7 @@ func (pa *path) onDemandStopPublisher() {
@@ -856,7 +858,7 @@ func (pa *path) onDemandStopPublisher() {
|
|
|
|
|
if pa.onDemandCmd != nil { |
|
|
|
|
pa.onDemandCmd.Close() |
|
|
|
|
pa.onDemandCmd = nil |
|
|
|
|
pa.Log(logger.Info, "runOnDemand command stopped") |
|
|
|
|
pa.Log(logger.Info, "runOnDemand command stopped: %s", reason) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|