Browse Source

api: return static sources in paths/list even if sourceOnDemand is true

pull/1088/head
aler9 3 years ago
parent
commit
7067c02030
  1. 185
      internal/core/api_test.go
  2. 10
      internal/core/hls_source.go
  3. 114
      internal/core/path.go
  4. 10
      internal/core/rtmp_source.go
  5. 10
      internal/core/rtsp_source.go
  6. 142
      internal/core/source_static.go

185
internal/core/api_test.go

@ -156,71 +156,172 @@ func TestAPIConfigPathsRemove(t *testing.T) { @@ -156,71 +156,172 @@ func TestAPIConfigPathsRemove(t *testing.T) {
}
func TestAPIPathsList(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("api: yes\n" +
"encryption: optional\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" mypath:\n")
require.Equal(t, true, ok)
defer p.close()
type pathSource struct {
Type string `json:"type"`
}
var out struct {
Items map[string]struct {
Source struct {
Type string `json:"type"`
} `json:"source"`
} `json:"items"`
type path struct {
SourceReady bool
Source pathSource `json:"source"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
_, ok = out.Items["mypath"]
require.Equal(t, true, ok)
track := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
type pathList struct {
Items map[string]path `json:"items"`
}
func() {
source := gortsplib.Client{}
t.Run("rtsp session", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n")
require.Equal(t, true, ok)
defer p.close()
track := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
}
err = source.StartPublishing("rtsp://localhost:8554/mypath",
source := gortsplib.Client{}
err := source.StartPublishing("rtsp://localhost:8554/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
var out pathList
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
require.Equal(t, "rtspSession", out.Items["mypath"].Source.Type)
}()
require.Equal(t, pathList{
Items: map[string]path{
"mypath": {
SourceReady: true,
Source: pathSource{
Type: "rtspSession",
},
},
},
}, out)
})
t.Run("rtsps session", func(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
func() {
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("api: yes\n" +
"encryption: optional\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" mypath:\n")
require.Equal(t, true, ok)
defer p.close()
track := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
}
err := source.StartPublishing("rtsps://localhost:8322/mypath",
source := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err = source.StartPublishing("rtsps://localhost:8322/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
var out pathList
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
require.Equal(t, "rtspsSession", out.Items["mypath"].Source.Type)
}()
require.Equal(t, pathList{
Items: map[string]path{
"mypath": {
SourceReady: true,
Source: pathSource{
Type: "rtspsSession",
},
},
},
}, out)
})
t.Run("rtsp source", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n" +
" source: rtsp://127.0.0.1:1234/mypath\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
var out pathList
err := httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
require.Equal(t, pathList{
Items: map[string]path{
"mypath": {
SourceReady: false,
Source: pathSource{
Type: "rtspSource",
},
},
},
}, out)
})
t.Run("rtmp source", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n" +
" source: rtmp://127.0.0.1:1234/mypath\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
var out pathList
err := httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
require.Equal(t, pathList{
Items: map[string]path{
"mypath": {
SourceReady: false,
Source: pathSource{
Type: "rtmpSource",
},
},
},
}, out)
})
t.Run("hls source", func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" mypath:\n" +
" source: http://127.0.0.1:1234/mypath\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
var out pathList
err := httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
require.Equal(t, pathList{
Items: map[string]path{
"mypath": {
SourceReady: false,
Source: pathSource{
Type: "hlsSource",
},
},
},
}, out)
})
}
func TestAPIList(t *testing.T) {
func TestAPIProtocolSpecificList(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)

10
internal/core/hls_source.go

@ -15,8 +15,8 @@ import ( @@ -15,8 +15,8 @@ import (
type hlsSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type hlsSource struct {
@ -51,7 +51,7 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -51,7 +51,7 @@ func (s *hlsSource) run(ctx context.Context) error {
defer func() {
if stream != nil {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.onSourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}
}()
@ -78,9 +78,7 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -78,9 +78,7 @@ func (s *hlsSource) run(ctx context.Context) error {
tracks = append(tracks, audioTrack)
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
tracks: tracks,
})
res := s.parent.onSourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
if res.err != nil {
return res.err
}

114
internal/core/path.go

@ -92,14 +92,12 @@ type pathSourceStaticSetReadyRes struct { @@ -92,14 +92,12 @@ type pathSourceStaticSetReadyRes struct {
}
type pathSourceStaticSetReadyReq struct {
source source
tracks gortsplib.Tracks
res chan pathSourceStaticSetReadyRes
}
type pathSourceStaticSetNotReadyReq struct {
source source
res chan struct{}
res chan struct{}
}
type pathReaderRemoveReq struct {
@ -221,7 +219,6 @@ type path struct { @@ -221,7 +219,6 @@ type path struct {
ctxCancel func()
source source
sourceReady bool
sourceStaticWg sync.WaitGroup
stream *stream
readers map[reader]pathReaderState
describeRequestsOnHold []pathDescribeReq
@ -352,8 +349,20 @@ func (pa *path) run() { @@ -352,8 +349,20 @@ func (pa *path) run() {
if pa.conf.Source == "redirect" {
pa.source = &sourceRedirect{}
} else if !pa.conf.SourceOnDemand && pa.hasStaticSource() {
pa.staticSourceCreate()
} else if pa.hasStaticSource() {
pa.source = newSourceStatic(
pa.conf.Source,
pa.conf.SourceProtocol,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
pa)
if !pa.conf.SourceOnDemand {
pa.source.(*sourceStatic).start()
}
}
var onInitCmd *externalcmd.Cmd
@ -422,40 +431,36 @@ func (pa *path) run() { @@ -422,40 +431,36 @@ func (pa *path) run() {
}
case req := <-pa.sourceStaticSetReady:
if req.source == pa.source {
pa.sourceSetReady(req.tracks)
pa.sourceSetReady(req.tracks)
if pa.hasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
if pa.hasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
}
pa.describeRequestsOnHold = nil
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
} else {
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
}
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
case req := <-pa.sourceStaticSetNotReady:
if req.source == pa.source {
pa.sourceSetNotReady()
if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
pa.sourceSetNotReady()
if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
close(req.res)
if pa.shouldClose() {
@ -541,7 +546,6 @@ func (pa *path) run() { @@ -541,7 +546,6 @@ func (pa *path) run() {
if pa.source != nil {
if source, ok := pa.source.(*sourceStatic); ok {
source.close()
pa.sourceStaticWg.Wait()
} else if source, ok := pa.source.(publisher); ok {
source.close()
}
@ -582,7 +586,7 @@ func (pa *path) externalCmdEnv() externalcmd.Environment { @@ -582,7 +586,7 @@ func (pa *path) externalCmdEnv() externalcmd.Environment {
}
func (pa *path) onDemandStaticSourceStart() {
pa.staticSourceCreate()
pa.source.(*sourceStatic).start()
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
@ -605,8 +609,7 @@ func (pa *path) onDemandStaticSourceStop() { @@ -605,8 +609,7 @@ func (pa *path) onDemandStaticSourceStop() {
pa.onDemandStaticSourceState = pathOnDemandStateInitial
pa.source.(*sourceStatic).close()
pa.source = nil
pa.source.(*sourceStatic).stop()
}
func (pa *path) onDemandPublisherStart() {
@ -695,20 +698,6 @@ func (pa *path) sourceSetNotReady() { @@ -695,20 +698,6 @@ func (pa *path) sourceSetNotReady() {
}
}
func (pa *path) staticSourceCreate() {
pa.source = newSourceStatic(
pa.ctx,
pa.conf.Source,
pa.conf.SourceProtocol,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
&pa.sourceStaticWg,
pa)
}
func (pa *path) doReaderRemove(r reader) {
state := pa.readers[r]
@ -963,24 +952,35 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) { @@ -963,24 +952,35 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
close(req.res)
}
// onSourceStaticSetReady is called by a sourceStatic.
func (pa *path) onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.res = make(chan pathSourceStaticSetReadyRes)
// onSourceStaticSetReady is called by sourceStatic.
func (pa *path) onSourceStaticSetReady(sourceStaticCtx context.Context, req pathSourceStaticSetReadyReq) {
select {
case pa.sourceStaticSetReady <- req:
return <-req.res
case <-pa.ctx.Done():
return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
// this avoids:
// - invalid requests sent after the source has been terminated
// - freezes caused by <-done inside stop()
case <-sourceStaticCtx.Done():
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
}
}
// onSourceStaticSetNotReady is called by a sourceStatic.
func (pa *path) onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.res = make(chan struct{})
// onSourceStaticSetNotReady is called by sourceStatic.
func (pa *path) onSourceStaticSetNotReady(sourceStaticCtx context.Context, req pathSourceStaticSetNotReadyReq) {
select {
case pa.sourceStaticSetNotReady <- req:
<-req.res
case <-pa.ctx.Done():
close(req.res)
// this avoids:
// - invalid requests sent after the source has been terminated
// - freezes caused by <-done inside stop()
case <-sourceStaticCtx.Done():
close(req.res)
}
}

10
internal/core/rtmp_source.go

@ -21,8 +21,8 @@ import ( @@ -21,8 +21,8 @@ import (
type rtmpSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type rtmpSource struct {
@ -119,9 +119,7 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -119,9 +119,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
tracks = append(tracks, audioTrack)
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
tracks: tracks,
})
res := s.parent.onSourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
if res.err != nil {
return res.err
}
@ -129,7 +127,7 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -129,7 +127,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
s.Log(logger.Info, "ready")
defer func() {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.onSourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
for {

10
internal/core/rtsp_source.go

@ -19,8 +19,8 @@ import ( @@ -19,8 +19,8 @@ import (
type rtspSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type rtspSource struct {
@ -125,9 +125,7 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -125,9 +125,7 @@ func (s *rtspSource) run(ctx context.Context) error {
}
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
tracks: c.Tracks(),
})
res := s.parent.onSourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
if res.err != nil {
return res.err
}
@ -135,7 +133,7 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -135,7 +133,7 @@ func (s *rtspSource) run(ctx context.Context) error {
s.Log(logger.Info, "ready")
defer func() {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.onSourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {

142
internal/core/source_static.go

@ -2,8 +2,8 @@ package core @@ -2,8 +2,8 @@ package core
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/aler9/rtsp-simple-server/internal/conf"
@ -22,8 +22,8 @@ type sourceStaticImpl interface { @@ -22,8 +22,8 @@ type sourceStaticImpl interface {
type sourceStaticParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticSetReady(context.Context, pathSourceStaticSetReadyReq)
onSourceStaticSetNotReady(context.Context, pathSourceStaticSetNotReadyReq)
}
// sourceStatic is a static source.
@ -35,16 +35,19 @@ type sourceStatic struct { @@ -35,16 +35,19 @@ type sourceStatic struct {
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
wg *sync.WaitGroup
parent sourceStaticParent
impl sourceStaticImpl
ctx context.Context
ctxCancel func()
impl sourceStaticImpl
running bool
done chan struct{}
sourceStaticImplSetReady chan pathSourceStaticSetReadyReq
sourceStaticImplSetNotReady chan pathSourceStaticSetNotReadyReq
}
func newSourceStatic(
parentCtx context.Context,
ur string,
protocol conf.SourceProtocol,
anyPortEnable bool,
@ -52,23 +55,19 @@ func newSourceStatic( @@ -52,23 +55,19 @@ func newSourceStatic(
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
wg *sync.WaitGroup,
parent sourceStaticParent,
) *sourceStatic {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &sourceStatic{
ur: ur,
protocol: protocol,
anyPortEnable: anyPortEnable,
fingerprint: fingerprint,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
wg: wg,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
ur: ur,
protocol: protocol,
anyPortEnable: anyPortEnable,
fingerprint: fingerprint,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
parent: parent,
sourceStaticImplSetReady: make(chan pathSourceStaticSetReadyReq),
sourceStaticImplSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
}
switch {
@ -99,17 +98,41 @@ func newSourceStatic( @@ -99,17 +98,41 @@ func newSourceStatic(
s)
}
return s
}
func (s *sourceStatic) close() {
if s.running {
s.stop()
}
}
func (s *sourceStatic) start() {
if s.running {
panic("should not happen")
}
s.running = true
s.impl.Log(logger.Info, "started")
s.wg.Add(1)
go s.run()
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
s.done = make(chan struct{})
return s
go s.run()
}
func (s *sourceStatic) close() {
func (s *sourceStatic) stop() {
if !s.running {
panic("should not happen")
}
s.running = false
s.impl.Log(logger.Info, "stopped")
s.ctxCancel()
// we must wait since s.ctx is not thread safe
<-s.done
}
func (s *sourceStatic) log(level logger.Level, format string, args ...interface{}) {
@ -117,25 +140,11 @@ func (s *sourceStatic) log(level logger.Level, format string, args ...interface{ @@ -117,25 +140,11 @@ func (s *sourceStatic) log(level logger.Level, format string, args ...interface{
}
func (s *sourceStatic) run() {
defer s.wg.Done()
defer close(s.done)
outer:
for {
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerErr := make(chan error)
go func() {
innerErr <- s.impl.run(innerCtx)
}()
select {
case err := <-innerErr:
innerCtxCancel()
s.impl.Log(logger.Info, "ERR: %v", err)
case <-s.ctx.Done():
innerCtxCancel()
<-innerErr
}
s.runInner()
select {
case <-time.After(sourceStaticRetryPause):
@ -147,19 +156,56 @@ outer: @@ -147,19 +156,56 @@ outer:
s.ctxCancel()
}
func (s *sourceStatic) runInner() {
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
implErr := make(chan error)
go func() {
implErr <- s.impl.run(innerCtx)
}()
for {
select {
case err := <-implErr:
innerCtxCancel()
s.impl.Log(logger.Info, "ERR: %v", err)
return
case req := <-s.sourceStaticImplSetReady:
s.parent.onSourceStaticSetReady(s.ctx, req)
case req := <-s.sourceStaticImplSetNotReady:
s.parent.onSourceStaticSetNotReady(s.ctx, req)
case <-s.ctx.Done():
innerCtxCancel()
<-implErr
return
}
}
}
// onSourceAPIDescribe implements source.
func (s *sourceStatic) onSourceAPIDescribe() interface{} {
return s.impl.onSourceAPIDescribe()
}
// onSourceStaticSetReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.source = s
return s.parent.onSourceStaticSetReady(req)
// onSourceStaticImplSetReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.res = make(chan pathSourceStaticSetReadyRes)
select {
case s.sourceStaticImplSetReady <- req:
return <-req.res
case <-s.ctx.Done():
return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
}
}
// onSourceStaticSetNotReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.source = s
s.parent.onSourceStaticSetNotReady(req)
// onSourceStaticImplSetNotReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.res = make(chan struct{})
select {
case s.sourceStaticImplSetNotReady <- req:
<-req.res
case <-s.ctx.Done():
}
}

Loading…
Cancel
Save