|
|
|
@ -66,7 +66,11 @@ func (pa *path) check() {
@@ -66,7 +66,11 @@ func (pa *path) check() {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if source, ok := pa.publisher.(*source); ok { |
|
|
|
|
if source.state == sourceStateRunning { |
|
|
|
|
// stop on demand source if needed
|
|
|
|
|
if pa.confp.SourceOnDemand && |
|
|
|
|
source.state == sourceStateRunning && |
|
|
|
|
time.Since(pa.lastRequested) >= 10*time.Second { |
|
|
|
|
|
|
|
|
|
hasClients := func() bool { |
|
|
|
|
for c := range pa.p.clients { |
|
|
|
|
if c.pathId == pa.id { |
|
|
|
@ -75,10 +79,7 @@ func (pa *path) check() {
@@ -75,10 +79,7 @@ func (pa *path) check() {
|
|
|
|
|
} |
|
|
|
|
return false |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// stop source if needed
|
|
|
|
|
if !hasClients && |
|
|
|
|
time.Since(pa.lastRequested) >= 10*time.Second { |
|
|
|
|
if !hasClients { |
|
|
|
|
source.log("stopping since we're not requested anymore") |
|
|
|
|
source.state = sourceStateStopped |
|
|
|
|
source.events <- sourceEventApplyState{source.state} |
|
|
|
@ -86,7 +87,10 @@ func (pa *path) check() {
@@ -86,7 +87,10 @@ func (pa *path) check() {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
if pa.onDemandCmd != nil { |
|
|
|
|
// stop on demand command if needed
|
|
|
|
|
if pa.onDemandCmd != nil && |
|
|
|
|
time.Since(pa.lastRequested) >= 10*time.Second { |
|
|
|
|
|
|
|
|
|
hasClientReaders := func() bool { |
|
|
|
|
for c := range pa.p.clients { |
|
|
|
|
if c.pathId == pa.id && c != pa.publisher { |
|
|
|
@ -95,12 +99,8 @@ func (pa *path) check() {
@@ -95,12 +99,8 @@ func (pa *path) check() {
|
|
|
|
|
} |
|
|
|
|
return false |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// stop on demand command if needed
|
|
|
|
|
if !hasClientReaders && |
|
|
|
|
time.Since(pa.lastRequested) >= 10*time.Second { |
|
|
|
|
if !hasClientReaders { |
|
|
|
|
pa.p.log("stopping on demand command since it is not requested anymore") |
|
|
|
|
|
|
|
|
|
pa.onDemandCmd.Process.Signal(os.Interrupt) |
|
|
|
|
pa.onDemandCmd.Wait() |
|
|
|
|
pa.onDemandCmd = nil |
|
|
|
@ -115,6 +115,7 @@ func (pa *path) describe(client *client) {
@@ -115,6 +115,7 @@ func (pa *path) describe(client *client) {
|
|
|
|
|
// publisher not found
|
|
|
|
|
if pa.publisher == nil { |
|
|
|
|
if pa.confp.RunOnDemand != "" { |
|
|
|
|
// start on demand command if needed; put the client on hold
|
|
|
|
|
if pa.onDemandCmd == nil { |
|
|
|
|
pa.p.log("starting on demand command") |
|
|
|
|
|
|
|
|
@ -131,11 +132,11 @@ func (pa *path) describe(client *client) {
@@ -131,11 +132,11 @@ func (pa *path) describe(client *client) {
|
|
|
|
|
client.pathId = pa.id |
|
|
|
|
client.state = clientStateWaitingDescription |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.id)} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// no on-demand: reply with 404
|
|
|
|
|
client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.id)} |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// publisher was found but is not ready: put the client on hold
|
|
|
|
|