|
|
|
@ -54,6 +54,7 @@ type pipelineManager struct {
@@ -54,6 +54,7 @@ type pipelineManager struct {
|
|
|
|
|
sessionSinkTable map[string]Sink |
|
|
|
|
duration time.Duration |
|
|
|
|
defaultSinkID string |
|
|
|
|
enabled bool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager { |
|
|
|
@ -68,12 +69,17 @@ func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userSt
@@ -68,12 +69,17 @@ func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userSt
|
|
|
|
|
sessionSinkTable: make(map[string]Sink), |
|
|
|
|
duration: 60 * time.Second, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return plm |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (plm *pipelineManager) Start() { |
|
|
|
|
plm.enabled = true |
|
|
|
|
|
|
|
|
|
plm.start() |
|
|
|
|
|
|
|
|
|
plm.Subscribe("channelling.session.create", plm.sessionCreate) |
|
|
|
|
plm.Subscribe("channelling.session.close", plm.sessionClose) |
|
|
|
|
|
|
|
|
|
return plm |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (plm *pipelineManager) cleanup() { |
|
|
|
@ -185,6 +191,10 @@ func (plm *pipelineManager) PipelineID(namespace string, sender Sender, session
@@ -185,6 +191,10 @@ func (plm *pipelineManager) PipelineID(namespace string, sender Sender, session
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline { |
|
|
|
|
if !plm.enabled { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
id := plm.PipelineID(namespace, sender, session, to) |
|
|
|
|
|
|
|
|
|
plm.mutex.Lock() |
|
|
|
|