Browse Source

Added default sink.

pull/272/head
Simon Eisenmann 9 years ago
parent
commit
71035380d8
  1. 7
      go/channelling/api/api_test.go
  2. 7
      go/channelling/bus.go
  3. 31
      go/channelling/pipeline_manager.go

7
go/channelling/api/api_test.go

@ -83,11 +83,14 @@ func (fake *fakeRoomManager) MakeRoomID(roomName, roomType string) string { @@ -83,11 +83,14 @@ func (fake *fakeRoomManager) MakeRoomID(roomName, roomType string) string {
}
func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channelling.Session, *fakeRoomManager) {
apiConsumer := channelling.NewChannellingAPIConsumer()
client, roomManager := &fakeClient{}, &fakeRoomManager{}
sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil)
session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "")
busManager := channelling.NewBusManager("", false, "")
return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager, nil), client, session, roomManager
busManager := channelling.NewBusManager(apiConsumer, "", false, "")
api := New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager, nil)
apiConsumer.SetChannellingAPI(api)
return api, client, session, roomManager
}
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {

7
go/channelling/bus.go

@ -22,9 +22,10 @@ @@ -22,9 +22,10 @@
package channelling
type SessionCreateRequest struct {
Id string
Session *DataSession
Room *DataRoom
Id string
Session *DataSession
Room *DataRoom
SetAsDefault bool
}
type DataSink struct {

31
go/channelling/pipeline_manager.go

@ -53,6 +53,7 @@ type pipelineManager struct { @@ -53,6 +53,7 @@ type pipelineManager struct {
sessionByBusIDTable map[string]*Session
sessionSinkTable map[string]Sink
duration time.Duration
defaultSinkID string
}
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager {
@ -125,6 +126,11 @@ func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCre @@ -125,6 +126,11 @@ func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCre
plm.Publish(reply, sink.Export())
}
plm.sessionSinkTable[session.Id] = sink
if msg.SetAsDefault {
plm.defaultSinkID = session.Id
log.Println("Using NATS sink as default session", session.Id)
}
plm.mutex.Unlock()
if msg.Session.Status != nil {
@ -198,18 +204,31 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session @@ -198,18 +204,31 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
return pipeline
}
func (plm *pipelineManager) FindSinkAndSession(to string) (Sink, *Session) {
func (plm *pipelineManager) FindSinkAndSession(to string) (sink Sink, session *Session) {
plm.mutex.RLock()
if sink, found := plm.sessionSinkTable[to]; found {
session, _ := plm.sessionTable[to]
var found bool
if sink, found = plm.sessionSinkTable[to]; found {
session, _ = plm.sessionTable[to]
plm.mutex.RUnlock()
if sink.Enabled() {
//log.Println("Pipeline sink found via manager", sink)
log.Println("Pipeline sink found via manager", sink)
return sink, session
}
return nil, nil
} else {
plm.mutex.RUnlock()
}
if plm.defaultSinkID != "" && to != plm.defaultSinkID {
// Keep target to while returning a the default sink.
log.Println("Find sink via default sink ID", plm.defaultSinkID)
sink, _ = plm.FindSinkAndSession(plm.defaultSinkID)
if sink != nil {
if session, found = plm.GetSession(to); found {
return
}
}
}
plm.mutex.RUnlock()
return nil, nil
}

Loading…
Cancel
Save