From 96cc6fd64c294f64456f83f656987dfafc314ac1 Mon Sep 17 00:00:00 2001
From: Simon Eisenmann <simon@struktur.de>
Date: Mon, 4 Apr 2016 17:54:29 +0200
Subject: [PATCH] Extended Pipeline manager to support Sink creation.

---
 go/channelling/bus_manager.go        | 39 +++++++++++++--
 go/channelling/pipeline.go           | 31 +++++++-----
 go/channelling/pipeline_manager.go   | 74 ++++++++++++++++++++++------
 go/channelling/roomworker.go         |  4 +-
 go/channelling/sink.go               |  1 +
 src/app/spreed-webrtc-server/main.go |  3 ++
 6 files changed, 119 insertions(+), 33 deletions(-)

diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go
index ec99c60f..c04c698c 100644
--- a/go/channelling/bus_manager.go
+++ b/go/channelling/bus_manager.go
@@ -28,6 +28,8 @@ import (
 	"time"
 
 	"github.com/nats-io/nats"
+
+	"github.com/strukturag/spreed-webrtc/go/buffercache"
 	"github.com/strukturag/spreed-webrtc/go/natsconnection"
 )
 
@@ -43,10 +45,12 @@ const (
 
 // A BusManager provides the API to interact with a bus.
 type BusManager interface {
+	Start()
 	Publish(subject string, v interface{}) error
 	Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
 	Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error
 	Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
+	CreateSink(string) Sink
 }
 
 // A BusTrigger is a container to serialize trigger events
@@ -86,9 +90,6 @@ func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager {
 	} else {
 		b = &noopBus{id}
 	}
-	if err == nil {
-		b.Trigger(BusManagerStartup, id, "", nil, nil)
-	}
 
 	return &busManager{b}
 }
@@ -97,6 +98,10 @@ type noopBus struct {
 	id string
 }
 
+func (bus *noopBus) Start() {
+	// noop
+}
+
 func (bus *noopBus) Publish(subject string, v interface{}) error {
 	return nil
 }
@@ -113,6 +118,10 @@ func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscripti
 	return nil, nil
 }
 
+func (bus *noopBus) CreateSink(id string) Sink {
+	return nil
+}
+
 type natsBus struct {
 	id           string
 	prefix       string
@@ -136,6 +145,10 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
 	return &natsBus{id, prefix, ec, triggerQueue}, nil
 }
 
+func (bus *natsBus) Start() {
+	bus.Trigger(BusManagerStartup, bus.id, "", nil, nil)
+}
+
 func (bus *natsBus) Publish(subject string, v interface{}) error {
 	return bus.ec.Publish(subject, v)
 }
@@ -171,6 +184,10 @@ func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscripti
 	return bus.ec.Subscribe(subject, cb)
 }
 
+func (bus *natsBus) CreateSink(id string) Sink {
+	return &natsSink{}
+}
+
 type busQueueEntry struct {
 	subject string
 	data    interface{}
@@ -185,3 +202,19 @@ func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntr
 		}
 	}
 }
+
+type natsSink struct {
+}
+
+func (sink *natsSink) Write(data interface{}) {
+}
+
+func (sink *natsSink) Send(b buffercache.Buffer) {
+}
+
+func (sink *natsSink) Enabled() bool {
+	return false
+}
+
+func (sink *natsSink) Close() {
+}
diff --git a/go/channelling/pipeline.go b/go/channelling/pipeline.go
index fdf70e44..dc34b55a 100644
--- a/go/channelling/pipeline.go
+++ b/go/channelling/pipeline.go
@@ -154,26 +154,29 @@ func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
 }
 
 func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
-	pipeline.mutex.RLock()
 	log.Println("Flush outgoing via pipeline", to, client == nil)
 	if client == nil {
-		sink := pipeline.sink
-		pipeline.mutex.RUnlock()
 		pipeline.Add(outgoing)
 
-		// It is possible to retrieve the userid for fake sessions here.
-		if session, found := pipeline.PipelineManager.GetSession(to); found {
-			log.Println("Pipeline found userid via manager", session.Userid())
+		pipeline.mutex.Lock()
+		sink := pipeline.sink
+		if sink != nil && sink.Enabled() {
+			// Sink it.
+			pipeline.mutex.Unlock()
+			sink.Write(outgoing)
+			return true
 		}
 
-		if sink == nil {
-			return true
+		sink = pipeline.PipelineManager.FindSink(to)
+		if sink != nil {
+			err := pipeline.attach(sink)
+			if err == nil {
+				pipeline.mutex.Unlock()
+				return true
+			}
 		}
-		// Sink it.
-		pipeline.sink.Write(outgoing)
-		return true
+		pipeline.mutex.Unlock()
 	}
-	pipeline.mutex.RUnlock()
 
 	return false
 }
@@ -181,6 +184,10 @@ func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outg
 func (pipeline *Pipeline) Attach(sink Sink) error {
 	pipeline.mutex.Lock()
 	defer pipeline.mutex.Unlock()
+	return pipeline.attach(sink)
+}
+
+func (pipeline *Pipeline) attach(sink Sink) error {
 	if pipeline.sink != nil {
 		return errors.New("pipeline already attached to sink")
 	}
diff --git a/go/channelling/pipeline_manager.go b/go/channelling/pipeline_manager.go
index a078de4e..fa69ef84 100644
--- a/go/channelling/pipeline_manager.go
+++ b/go/channelling/pipeline_manager.go
@@ -39,6 +39,7 @@ type PipelineManager interface {
 	SessionCreator
 	GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
 	GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
+	FindSink(to string) Sink
 }
 
 type pipelineManager struct {
@@ -46,21 +47,25 @@ type pipelineManager struct {
 	SessionStore
 	UserStore
 	SessionCreator
-	mutex         sync.RWMutex
-	pipelineTable map[string]*Pipeline
-	sessionTable  map[string]*Session
-	duration      time.Duration
+	mutex               sync.RWMutex
+	pipelineTable       map[string]*Pipeline
+	sessionTable        map[string]*Session
+	sessionByBusIDTable map[string]*Session
+	sessionSinkTable    map[string]Sink
+	duration            time.Duration
 }
 
 func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager {
 	plm := &pipelineManager{
-		BusManager:     busManager,
-		SessionStore:   sessionStore,
-		UserStore:      userStore,
-		SessionCreator: sessionCreator,
-		pipelineTable:  make(map[string]*Pipeline),
-		sessionTable:   make(map[string]*Session),
-		duration:       30 * time.Minute,
+		BusManager:          busManager,
+		SessionStore:        sessionStore,
+		UserStore:           userStore,
+		SessionCreator:      sessionCreator,
+		pipelineTable:       make(map[string]*Pipeline),
+		sessionTable:        make(map[string]*Session),
+		sessionByBusIDTable: make(map[string]*Session),
+		sessionSinkTable:    make(map[string]Sink),
+		duration:            30 * time.Minute,
 	}
 	plm.start()
 
@@ -97,22 +102,35 @@ func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCre
 		return
 	}
 
+	var sink Sink
+
 	plm.mutex.Lock()
-	session, ok := plm.sessionTable[msg.Id]
+	session, ok := plm.sessionByBusIDTable[msg.Id]
 	if ok {
 		// Remove existing session with same ID.
+		delete(plm.sessionTable, session.Id)
+		sink, _ = plm.sessionSinkTable[session.Id]
+		delete(plm.sessionSinkTable, session.Id)
 		session.Close()
+		if sink != nil {
+			sink.Close()
+		}
 	}
 	session = plm.CreateSession(nil, "")
-	plm.sessionTable[msg.Id] = session
+	plm.sessionByBusIDTable[msg.Id] = session
+	plm.sessionTable[session.Id] = session
+	if sink == nil {
+		sink = plm.CreateSink(msg.Id)
+	}
+	plm.sessionSinkTable[session.Id] = sink
 	plm.mutex.Unlock()
 
 	session.Status = msg.Session.Status
 	session.SetUseridFake(msg.Session.Userid)
-	pipeline := plm.GetPipeline("", nil, session, "")
+	//pipeline := plm.GetPipeline("", nil, session, "")
 
 	if msg.Room != nil {
-		room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, pipeline)
+		room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, nil)
 		log.Println("Joined NATS session to room", room, err)
 	}
 
@@ -127,7 +145,15 @@ func (plm *pipelineManager) sessionClose(subject, reply string, id string) {
 	}
 
 	plm.mutex.Lock()
-	session, ok := plm.sessionTable[id]
+	session, ok := plm.sessionByBusIDTable[id]
+	if ok {
+		delete(plm.sessionByBusIDTable, id)
+		delete(plm.sessionTable, session.Id)
+		if sink, ok := plm.sessionSinkTable[session.Id]; ok {
+			delete(plm.sessionSinkTable, session.Id)
+			sink.Close()
+		}
+	}
 	plm.mutex.Unlock()
 
 	if ok {
@@ -172,3 +198,19 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
 
 	return pipeline
 }
+
+func (plm *pipelineManager) FindSink(to string) Sink {
+	// It is possible to retrieve the userid for fake sessions here.
+	plm.mutex.RLock()
+	if sink, found := plm.sessionSinkTable[to]; found {
+		plm.mutex.RUnlock()
+		if sink.Enabled() {
+			log.Println("Pipeline sink found via manager", sink)
+			return sink
+		}
+		return nil
+	}
+
+	plm.mutex.RUnlock()
+	return nil
+}
diff --git a/go/channelling/roomworker.go b/go/channelling/roomworker.go
index 8450f069..d991ca24 100644
--- a/go/channelling/roomworker.go
+++ b/go/channelling/roomworker.go
@@ -225,8 +225,8 @@ func (r *roomWorker) Broadcast(sessionID string, message buffercache.Buffer) {
 	worker := func() {
 		r.mutex.RLock()
 		for id, user := range r.users {
-			if id == sessionID {
-				// Skip broadcast to self.
+			if id == sessionID || user.Sender == nil {
+				// Skip broadcast to self or non existing sender.
 				continue
 			}
 			//fmt.Printf("%s\n", m.Message)
diff --git a/go/channelling/sink.go b/go/channelling/sink.go
index 0da8611a..11a28398 100644
--- a/go/channelling/sink.go
+++ b/go/channelling/sink.go
@@ -31,5 +31,6 @@ type Sink interface {
 	// Write sends outgoing data on the sink from the
 	Write(interface{})
 	Send(buffercache.Buffer)
+	Enabled() bool
 	Close()
 }
diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go
index 2149a8a0..9c8bfb6e 100644
--- a/src/app/spreed-webrtc-server/main.go
+++ b/src/app/spreed-webrtc-server/main.go
@@ -269,6 +269,9 @@ func runner(runtime phoenix.Runtime) error {
 	pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager)
 	channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
 
+	// Start bus.
+	busManager.Start()
+
 	// Add handlers.
 	r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
 	r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour)))