Browse Source

Extended Pipeline manager to support Sink creation.

pull/272/head
Simon Eisenmann 10 years ago
parent
commit
96cc6fd64c
  1. 39
      go/channelling/bus_manager.go
  2. 31
      go/channelling/pipeline.go
  3. 74
      go/channelling/pipeline_manager.go
  4. 4
      go/channelling/roomworker.go
  5. 1
      go/channelling/sink.go
  6. 3
      src/app/spreed-webrtc-server/main.go

39
go/channelling/bus_manager.go

@ -28,6 +28,8 @@ import (
"time" "time"
"github.com/nats-io/nats" "github.com/nats-io/nats"
"github.com/strukturag/spreed-webrtc/go/buffercache"
"github.com/strukturag/spreed-webrtc/go/natsconnection" "github.com/strukturag/spreed-webrtc/go/natsconnection"
) )
@ -43,10 +45,12 @@ const (
// A BusManager provides the API to interact with a bus. // A BusManager provides the API to interact with a bus.
type BusManager interface { type BusManager interface {
Start()
Publish(subject string, v interface{}) error Publish(subject string, v interface{}) error
Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
CreateSink(string) Sink
} }
// A BusTrigger is a container to serialize trigger events // A BusTrigger is a container to serialize trigger events
@ -86,9 +90,6 @@ func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager {
} else { } else {
b = &noopBus{id} b = &noopBus{id}
} }
if err == nil {
b.Trigger(BusManagerStartup, id, "", nil, nil)
}
return &busManager{b} return &busManager{b}
} }
@ -97,6 +98,10 @@ type noopBus struct {
id string id string
} }
func (bus *noopBus) Start() {
// noop
}
func (bus *noopBus) Publish(subject string, v interface{}) error { func (bus *noopBus) Publish(subject string, v interface{}) error {
return nil return nil
} }
@ -113,6 +118,10 @@ func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscripti
return nil, nil return nil, nil
} }
func (bus *noopBus) CreateSink(id string) Sink {
return nil
}
type natsBus struct { type natsBus struct {
id string id string
prefix string prefix string
@ -136,6 +145,10 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
return &natsBus{id, prefix, ec, triggerQueue}, nil return &natsBus{id, prefix, ec, triggerQueue}, nil
} }
func (bus *natsBus) Start() {
bus.Trigger(BusManagerStartup, bus.id, "", nil, nil)
}
func (bus *natsBus) Publish(subject string, v interface{}) error { func (bus *natsBus) Publish(subject string, v interface{}) error {
return bus.ec.Publish(subject, v) return bus.ec.Publish(subject, v)
} }
@ -171,6 +184,10 @@ func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscripti
return bus.ec.Subscribe(subject, cb) return bus.ec.Subscribe(subject, cb)
} }
func (bus *natsBus) CreateSink(id string) Sink {
return &natsSink{}
}
type busQueueEntry struct { type busQueueEntry struct {
subject string subject string
data interface{} data interface{}
@ -185,3 +202,19 @@ func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntr
} }
} }
} }
type natsSink struct {
}
func (sink *natsSink) Write(data interface{}) {
}
func (sink *natsSink) Send(b buffercache.Buffer) {
}
func (sink *natsSink) Enabled() bool {
return false
}
func (sink *natsSink) Close() {
}

31
go/channelling/pipeline.go

@ -154,26 +154,29 @@ func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
} }
func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool { func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
pipeline.mutex.RLock()
log.Println("Flush outgoing via pipeline", to, client == nil) log.Println("Flush outgoing via pipeline", to, client == nil)
if client == nil { if client == nil {
sink := pipeline.sink
pipeline.mutex.RUnlock()
pipeline.Add(outgoing) pipeline.Add(outgoing)
// It is possible to retrieve the userid for fake sessions here. pipeline.mutex.Lock()
if session, found := pipeline.PipelineManager.GetSession(to); found { sink := pipeline.sink
log.Println("Pipeline found userid via manager", session.Userid()) if sink != nil && sink.Enabled() {
// Sink it.
pipeline.mutex.Unlock()
sink.Write(outgoing)
return true
} }
if sink == nil { sink = pipeline.PipelineManager.FindSink(to)
return true if sink != nil {
err := pipeline.attach(sink)
if err == nil {
pipeline.mutex.Unlock()
return true
}
} }
// Sink it. pipeline.mutex.Unlock()
pipeline.sink.Write(outgoing)
return true
} }
pipeline.mutex.RUnlock()
return false return false
} }
@ -181,6 +184,10 @@ func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outg
func (pipeline *Pipeline) Attach(sink Sink) error { func (pipeline *Pipeline) Attach(sink Sink) error {
pipeline.mutex.Lock() pipeline.mutex.Lock()
defer pipeline.mutex.Unlock() defer pipeline.mutex.Unlock()
return pipeline.attach(sink)
}
func (pipeline *Pipeline) attach(sink Sink) error {
if pipeline.sink != nil { if pipeline.sink != nil {
return errors.New("pipeline already attached to sink") return errors.New("pipeline already attached to sink")
} }

74
go/channelling/pipeline_manager.go

@ -39,6 +39,7 @@ type PipelineManager interface {
SessionCreator SessionCreator
GetPipelineByID(id string) (pipeline *Pipeline, ok bool) GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
FindSink(to string) Sink
} }
type pipelineManager struct { type pipelineManager struct {
@ -46,21 +47,25 @@ type pipelineManager struct {
SessionStore SessionStore
UserStore UserStore
SessionCreator SessionCreator
mutex sync.RWMutex mutex sync.RWMutex
pipelineTable map[string]*Pipeline pipelineTable map[string]*Pipeline
sessionTable map[string]*Session sessionTable map[string]*Session
duration time.Duration sessionByBusIDTable map[string]*Session
sessionSinkTable map[string]Sink
duration time.Duration
} }
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager { func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager {
plm := &pipelineManager{ plm := &pipelineManager{
BusManager: busManager, BusManager: busManager,
SessionStore: sessionStore, SessionStore: sessionStore,
UserStore: userStore, UserStore: userStore,
SessionCreator: sessionCreator, SessionCreator: sessionCreator,
pipelineTable: make(map[string]*Pipeline), pipelineTable: make(map[string]*Pipeline),
sessionTable: make(map[string]*Session), sessionTable: make(map[string]*Session),
duration: 30 * time.Minute, sessionByBusIDTable: make(map[string]*Session),
sessionSinkTable: make(map[string]Sink),
duration: 30 * time.Minute,
} }
plm.start() plm.start()
@ -97,22 +102,35 @@ func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCre
return return
} }
var sink Sink
plm.mutex.Lock() plm.mutex.Lock()
session, ok := plm.sessionTable[msg.Id] session, ok := plm.sessionByBusIDTable[msg.Id]
if ok { if ok {
// Remove existing session with same ID. // Remove existing session with same ID.
delete(plm.sessionTable, session.Id)
sink, _ = plm.sessionSinkTable[session.Id]
delete(plm.sessionSinkTable, session.Id)
session.Close() session.Close()
if sink != nil {
sink.Close()
}
} }
session = plm.CreateSession(nil, "") session = plm.CreateSession(nil, "")
plm.sessionTable[msg.Id] = session plm.sessionByBusIDTable[msg.Id] = session
plm.sessionTable[session.Id] = session
if sink == nil {
sink = plm.CreateSink(msg.Id)
}
plm.sessionSinkTable[session.Id] = sink
plm.mutex.Unlock() plm.mutex.Unlock()
session.Status = msg.Session.Status session.Status = msg.Session.Status
session.SetUseridFake(msg.Session.Userid) session.SetUseridFake(msg.Session.Userid)
pipeline := plm.GetPipeline("", nil, session, "") //pipeline := plm.GetPipeline("", nil, session, "")
if msg.Room != nil { if msg.Room != nil {
room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, pipeline) room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, nil)
log.Println("Joined NATS session to room", room, err) log.Println("Joined NATS session to room", room, err)
} }
@ -127,7 +145,15 @@ func (plm *pipelineManager) sessionClose(subject, reply string, id string) {
} }
plm.mutex.Lock() plm.mutex.Lock()
session, ok := plm.sessionTable[id] session, ok := plm.sessionByBusIDTable[id]
if ok {
delete(plm.sessionByBusIDTable, id)
delete(plm.sessionTable, session.Id)
if sink, ok := plm.sessionSinkTable[session.Id]; ok {
delete(plm.sessionSinkTable, session.Id)
sink.Close()
}
}
plm.mutex.Unlock() plm.mutex.Unlock()
if ok { if ok {
@ -172,3 +198,19 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
return pipeline return pipeline
} }
func (plm *pipelineManager) FindSink(to string) Sink {
// It is possible to retrieve the userid for fake sessions here.
plm.mutex.RLock()
if sink, found := plm.sessionSinkTable[to]; found {
plm.mutex.RUnlock()
if sink.Enabled() {
log.Println("Pipeline sink found via manager", sink)
return sink
}
return nil
}
plm.mutex.RUnlock()
return nil
}

4
go/channelling/roomworker.go

@ -225,8 +225,8 @@ func (r *roomWorker) Broadcast(sessionID string, message buffercache.Buffer) {
worker := func() { worker := func() {
r.mutex.RLock() r.mutex.RLock()
for id, user := range r.users { for id, user := range r.users {
if id == sessionID { if id == sessionID || user.Sender == nil {
// Skip broadcast to self. // Skip broadcast to self or non existing sender.
continue continue
} }
//fmt.Printf("%s\n", m.Message) //fmt.Printf("%s\n", m.Message)

1
go/channelling/sink.go

@ -31,5 +31,6 @@ type Sink interface {
// Write sends outgoing data on the sink from the // Write sends outgoing data on the sink from the
Write(interface{}) Write(interface{})
Send(buffercache.Buffer) Send(buffercache.Buffer)
Enabled() bool
Close() Close()
} }

3
src/app/spreed-webrtc-server/main.go

@ -269,6 +269,9 @@ func runner(runtime phoenix.Runtime) error {
pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager)
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager) channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
// Start bus.
busManager.Start()
// Add handlers. // Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour))) r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour)))

Loading…
Cancel
Save