Browse Source

Implement NATS sink outbound encoding.

pull/272/head
Simon Eisenmann 9 years ago
parent
commit
33aa9e4e15
  1. 21
      go/channelling/api.go
  2. 5
      go/channelling/bus.go
  3. 124
      go/channelling/bus_manager.go
  4. 1
      go/channelling/data.go
  5. 69
      go/channelling/pipeline.go
  6. 39
      go/channelling/pipeline_manager.go
  7. 4
      go/channelling/server/pipelines.go
  8. 9
      go/channelling/sink.go
  9. 3
      go/natsconnection/natsconnection.go
  10. 6
      src/app/spreed-webrtc-server/main.go

21
go/channelling/api.go

@ -26,3 +26,24 @@ type ChannellingAPI interface { @@ -26,3 +26,24 @@ type ChannellingAPI interface {
OnDisconnect(*Client, *Session)
OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error)
}
type ChannellingAPIConsumer interface {
SetChannellingAPI(ChannellingAPI)
GetChannellingAPI() ChannellingAPI
}
type channellingAPIConsumer struct {
ChannellingAPI ChannellingAPI
}
func NewChannellingAPIConsumer() ChannellingAPIConsumer {
return &channellingAPIConsumer{}
}
func (c *channellingAPIConsumer) SetChannellingAPI(api ChannellingAPI) {
c.ChannellingAPI = api
}
func (c *channellingAPIConsumer) GetChannellingAPI() ChannellingAPI {
return c.ChannellingAPI
}

5
go/channelling/bus.go

@ -26,3 +26,8 @@ type SessionCreateRequest struct { @@ -26,3 +26,8 @@ type SessionCreateRequest struct {
Session *DataSession
Room *DataRoom
}
type DataSink struct {
SubjectOut string `json:subject_out"`
SubjectIn string `json:subject_in"`
}

124
go/channelling/bus_manager.go

@ -25,11 +25,11 @@ import ( @@ -25,11 +25,11 @@ import (
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/nats-io/nats"
"github.com/strukturag/spreed-webrtc/go/buffercache"
"github.com/strukturag/spreed-webrtc/go/natsconnection"
)
@ -45,11 +45,15 @@ const ( @@ -45,11 +45,15 @@ const (
// A BusManager provides the API to interact with a bus.
type BusManager interface {
ChannellingAPIConsumer
Start()
Publish(subject string, v interface{}) error
Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error)
BindSendChan(subject string, channel interface{}) error
PrefixSubject(string) string
CreateSink(string) Sink
}
@ -69,32 +73,29 @@ func BusSubjectTrigger(prefix, suffix string) string { @@ -69,32 +73,29 @@ func BusSubjectTrigger(prefix, suffix string) string {
return fmt.Sprintf("%s.%s", prefix, suffix)
}
type busManager struct {
BusManager
}
// NewBusManager creates and initializes a new BusMager with the
// provided flags for NATS support. It is intended to connect the
// backend bus with a easy to use API to send and receive bus data.
func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager {
func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager {
var b BusManager
var err error
if useNats {
b, err = newNatsBus(id, subjectPrefix)
b, err = newNatsBus(apiConsumer, id, subjectPrefix)
if err == nil {
log.Println("NATS bus connected")
} else {
log.Println("Error connecting NATS bus", err)
b = &noopBus{id}
b = &noopBus{apiConsumer, id}
}
} else {
b = &noopBus{id}
b = &noopBus{apiConsumer, id}
}
return &busManager{b}
return b
}
type noopBus struct {
ChannellingAPIConsumer
id string
}
@ -114,6 +115,18 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeli @@ -114,6 +115,18 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeli
return nil
}
func (bus *noopBus) PrefixSubject(subject string) string {
return subject
}
func (bus *noopBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) {
return nil, nil
}
func (bus *noopBus) BindSendChan(subject string, channel interface{}) error {
return nil
}
func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
return nil, nil
}
@ -123,13 +136,14 @@ func (bus *noopBus) CreateSink(id string) Sink { @@ -123,13 +136,14 @@ func (bus *noopBus) CreateSink(id string) Sink {
}
type natsBus struct {
ChannellingAPIConsumer
id string
prefix string
ec *natsconnection.EncodedConnection
triggerQueue chan *busQueueEntry
}
func newNatsBus(id, prefix string) (*natsBus, error) {
func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) {
ec, err := natsconnection.EstablishJSONEncodedConnection(nil)
if err != nil {
return nil, err
@ -139,13 +153,13 @@ func newNatsBus(id, prefix string) (*natsBus, error) { @@ -139,13 +153,13 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
}
// Create buffered channel for outbound NATS data.
triggerQueue := make(chan *busQueueEntry, 50)
// Start go routine to process outbount NATS publishing.
go chPublish(ec, triggerQueue)
return &natsBus{id, prefix, ec, triggerQueue}, nil
return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil
}
func (bus *natsBus) Start() {
// Start go routine to process outbount NATS publishing.
go chPublish(bus.ec, bus.triggerQueue)
bus.Trigger(BusManagerStartup, bus.id, "", nil, nil)
}
@ -180,12 +194,25 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli @@ -180,12 +194,25 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
return err
}
func (bus *natsBus) PrefixSubject(sub string) string {
return fmt.Sprintf("%s.%s", bus.prefix, sub)
}
func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
return bus.ec.Subscribe(subject, cb)
}
func (bus *natsBus) CreateSink(id string) Sink {
return &natsSink{}
func (bus *natsBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) {
return bus.ec.BindRecvChan(subject, channel)
}
func (bus *natsBus) BindSendChan(subject string, channel interface{}) error {
return bus.ec.BindSendChan(subject, channel)
}
func (bus *natsBus) CreateSink(id string) (sink Sink) {
sink = newNatsSink(bus, id)
return
}
type busQueueEntry struct {
@ -204,17 +231,76 @@ func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntr @@ -204,17 +231,76 @@ func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntr
}
type natsSink struct {
sync.RWMutex
id string
bm BusManager
closed bool
SubjectOut string
SubjectIn string
sub *nats.Subscription
sendQueue chan *DataOutgoing
}
func (sink *natsSink) Write(data interface{}) {
func newNatsSink(bm BusManager, id string) *natsSink {
sink := &natsSink{
id: id,
bm: bm,
SubjectOut: bm.PrefixSubject(fmt.Sprintf("sink.%s.out", id)),
SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)),
}
sink.sendQueue = make(chan *DataOutgoing, 100)
bm.BindSendChan(sink.SubjectOut, sink.sendQueue)
return sink
}
func (sink *natsSink) Send(b buffercache.Buffer) {
func (sink *natsSink) Write(outgoing *DataOutgoing) (err error) {
if sink.Enabled() {
log.Println("Sending via NATS sink", sink.SubjectOut, outgoing)
sink.sendQueue <- outgoing
}
return err
}
func (sink *natsSink) Enabled() bool {
return false
sink.RLock()
defer sink.RUnlock()
return sink.closed == false
}
func (sink *natsSink) Close() {
sink.Lock()
defer sink.Unlock()
if sink.sub != nil {
err := sink.sub.Unsubscribe()
if err != nil {
log.Println("Failed to unsubscribe NATS sink", err)
} else {
sink.sub = nil
}
}
sink.closed = true
}
func (sink *natsSink) Export() *DataSink {
return &DataSink{
SubjectOut: sink.SubjectOut,
SubjectIn: sink.SubjectIn,
}
}
func (sink *natsSink) BindRecvChan(channel interface{}) (*nats.Subscription, error) {
sink.Lock()
defer sink.Unlock()
if sink.sub != nil {
sink.sub.Unsubscribe()
sink.sub = nil
}
sub, err := sink.bm.BindRecvChan(sink.SubjectIn, channel)
if err != nil {
return nil, err
}
sink.sub = sub
return sub, nil
}

1
go/channelling/data.go

@ -203,6 +203,7 @@ type DataOutgoing struct { @@ -203,6 +203,7 @@ type DataOutgoing struct {
To string `json:",omitempty"`
Iid string `json:",omitempty"`
A string `json:",omitempty"`
Pipe string `json:",omitempty"`
}
type DataSessions struct {

69
go/channelling/pipeline.go

@ -43,9 +43,11 @@ type Pipeline struct { @@ -43,9 +43,11 @@ type Pipeline struct {
namespace string
id string
from *Session
to *Session
expires *time.Time
data []*DataOutgoing
sink Sink
recvQueue chan *DataIncoming
}
func NewPipeline(manager PipelineManager,
@ -58,11 +60,27 @@ func NewPipeline(manager PipelineManager, @@ -58,11 +60,27 @@ func NewPipeline(manager PipelineManager,
namespace: namespace,
id: id,
from: from,
recvQueue: make(chan *DataIncoming, 100),
}
go pipeline.receive()
pipeline.Refresh(duration)
return pipeline
}
func (pipeline *Pipeline) receive() {
// XXX(longsleep): Make sure this does not leak go routines.
// XXX(longsleep): Call to ToSession() should be avoided because it locks.
api := pipeline.PipelineManager.GetChannellingAPI()
for data := range pipeline.recvQueue {
_, err := api.OnIncoming(nil, pipeline.ToSession(), data)
if err != nil {
// TODO(longsleep): Handle reply and error.
log.Println("Pipeline receive incoming error", err)
}
}
log.Println("Pipline receive exit")
}
func (pipeline *Pipeline) GetID() string {
return pipeline.id
}
@ -75,6 +93,7 @@ func (pipeline *Pipeline) Refresh(duration time.Duration) { @@ -75,6 +93,7 @@ func (pipeline *Pipeline) Refresh(duration time.Duration) {
}
func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline {
msg.Pipe = pipeline.id
pipeline.mutex.Lock()
pipeline.data = append(pipeline.data, msg)
pipeline.mutex.Unlock()
@ -83,13 +102,7 @@ func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline { @@ -83,13 +102,7 @@ func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline {
}
func (pipeline *Pipeline) Send(b buffercache.Buffer) {
pipeline.mutex.RLock()
sink := pipeline.sink
pipeline.mutex.RUnlock()
if sink != nil {
// Send it through sink.
sink.Send(b)
}
// Noop.
}
func (pipeline *Pipeline) Index() uint64 {
@ -100,7 +113,6 @@ func (pipeline *Pipeline) Close() { @@ -100,7 +113,6 @@ func (pipeline *Pipeline) Close() {
pipeline.mutex.Lock()
pipeline.expires = nil
if pipeline.sink != nil {
pipeline.sink.Close()
pipeline.sink = nil
}
pipeline.mutex.Unlock()
@ -119,10 +131,18 @@ func (pipeline *Pipeline) Expired() bool { @@ -119,10 +131,18 @@ func (pipeline *Pipeline) Expired() bool {
return expired
}
func (pipeline *Pipeline) Session() *Session {
func (pipeline *Pipeline) FromSession() *Session {
pipeline.mutex.RLock()
defer pipeline.mutex.RUnlock()
return pipeline.from
}
func (pipeline *Pipeline) ToSession() *Session {
pipeline.mutex.RLock()
defer pipeline.mutex.RUnlock()
return pipeline.to
}
func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
pipeline.mutex.RLock()
var lineRaw []byte
@ -154,7 +174,7 @@ func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) { @@ -154,7 +174,7 @@ func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
}
func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
log.Println("Flush outgoing via pipeline", to, client == nil)
//log.Println("Flush outgoing via pipeline", to, client == nil)
if client == nil {
pipeline.Add(outgoing)
@ -167,11 +187,18 @@ func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outg @@ -167,11 +187,18 @@ func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outg
return true
}
sink = pipeline.PipelineManager.FindSink(to)
var session *Session
sink, session = pipeline.PipelineManager.FindSinkAndSession(to)
if sink != nil {
pipeline.to = session
err := pipeline.attach(sink)
if err == nil {
pipeline.mutex.Unlock()
// Create incoming receiver.
sink.BindRecvChan(pipeline.recvQueue)
sink.Write(outgoing)
return true
}
}
@ -184,7 +211,18 @@ func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outg @@ -184,7 +211,18 @@ func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outg
func (pipeline *Pipeline) Attach(sink Sink) error {
pipeline.mutex.Lock()
defer pipeline.mutex.Unlock()
return pipeline.attach(sink)
// Sink existing data first.
log.Println("Attach sink to pipeline", pipeline.id)
err := pipeline.attach(sink)
if err == nil {
for _, msg := range pipeline.data {
log.Println("Flushing pipeline to sink after attach", len(pipeline.data))
sink.Write(msg)
}
}
return err
}
func (pipeline *Pipeline) attach(sink Sink) error {
@ -192,12 +230,5 @@ func (pipeline *Pipeline) attach(sink Sink) error { @@ -192,12 +230,5 @@ func (pipeline *Pipeline) attach(sink Sink) error {
return errors.New("pipeline already attached to sink")
}
pipeline.sink = sink
// Sink existing data first.
log.Println("Attach sink to pipeline", pipeline.id)
for _, msg := range pipeline.data {
sink.Write(msg)
}
return nil
}

39
go/channelling/pipeline_manager.go

@ -39,7 +39,7 @@ type PipelineManager interface { @@ -39,7 +39,7 @@ type PipelineManager interface {
SessionCreator
GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
FindSink(to string) Sink
FindSinkAndSession(to string) (Sink, *Session)
}
type pipelineManager struct {
@ -112,22 +112,28 @@ func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCre @@ -112,22 +112,28 @@ func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCre
sink, _ = plm.sessionSinkTable[session.Id]
delete(plm.sessionSinkTable, session.Id)
session.Close()
if sink != nil {
sink.Close()
}
}
session = plm.CreateSession(nil, "")
plm.sessionByBusIDTable[msg.Id] = session
plm.sessionTable[session.Id] = session
if sink == nil {
sink = plm.CreateSink(msg.Id)
log.Println("Created NATS sink", msg.Id)
}
if reply != "" {
// Always reply with our sink data
plm.Publish(reply, sink.Export())
}
plm.sessionSinkTable[session.Id] = sink
plm.mutex.Unlock()
session.Status = msg.Session.Status
session.SetUseridFake(msg.Session.Userid)
//pipeline := plm.GetPipeline("", nil, session, "")
if msg.Session.Status != nil {
session.Status = msg.Session.Status
}
if msg.Session.Userid != "" {
session.SetUseridFake(msg.Session.Userid)
}
if msg.Room != nil {
room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, nil)
@ -164,13 +170,6 @@ func (plm *pipelineManager) sessionClose(subject, reply string, id string) { @@ -164,13 +170,6 @@ func (plm *pipelineManager) sessionClose(subject, reply string, id string) {
func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) {
plm.mutex.RLock()
pipeline, ok := plm.pipelineTable[id]
if !ok {
// XXX(longsleep): Hack for development
for _, pipeline = range plm.pipelineTable {
ok = true
break
}
}
plm.mutex.RUnlock()
return pipeline, ok
}
@ -199,18 +198,18 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session @@ -199,18 +198,18 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
return pipeline
}
func (plm *pipelineManager) FindSink(to string) Sink {
// It is possible to retrieve the userid for fake sessions here.
func (plm *pipelineManager) FindSinkAndSession(to string) (Sink, *Session) {
plm.mutex.RLock()
if sink, found := plm.sessionSinkTable[to]; found {
session, _ := plm.sessionTable[to]
plm.mutex.RUnlock()
if sink.Enabled() {
log.Println("Pipeline sink found via manager", sink)
return sink
//log.Println("Pipeline sink found via manager", sink)
return sink, session
}
return nil
return nil, nil
}
plm.mutex.RUnlock()
return nil
return nil, nil
}

4
go/channelling/server/pipelines.go

@ -84,10 +84,10 @@ func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http. @@ -84,10 +84,10 @@ func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http.
}
result := &channelling.DataOutgoing{
From: pipeline.Session().Id,
From: pipeline.FromSession().Id,
Iid: incoming.Iid,
}
reply, err := pipelines.API.OnIncoming(pipeline, pipeline.Session(), &incoming)
reply, err := pipelines.API.OnIncoming(pipeline, pipeline.ToSession(), &incoming)
if err == nil {
result.Data = reply
} else {

9
go/channelling/sink.go

@ -22,15 +22,16 @@ @@ -22,15 +22,16 @@
package channelling
import (
"github.com/strukturag/spreed-webrtc/go/buffercache"
"github.com/nats-io/nats"
)
// Sink connects a Pipeline with end points in both directions by
// getting attached to a Pipeline.
type Sink interface {
// Write sends outgoing data on the sink from the
Write(interface{})
Send(buffercache.Buffer)
// Write sends outgoing data on the sink
Write(*DataOutgoing) error
Enabled() bool
Close()
Export() *DataSink
BindRecvChan(channel interface{}) (*nats.Subscription, error)
}

3
go/natsconnection/natsconnection.go

@ -48,6 +48,9 @@ func NewConnection() (*Connection, error) { @@ -48,6 +48,9 @@ func NewConnection() (*Connection, error) {
ReconnectedCB: func(conn *nats.Conn) {
log.Println("NATS reconnected")
},
AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) {
log.Println("NATS async error", sub, err)
},
}
nc, err := opts.Connect()

6
src/app/spreed-webrtc-server/main.go

@ -258,6 +258,7 @@ func runner(runtime phoenix.Runtime) error { @@ -258,6 +258,7 @@ func runner(runtime phoenix.Runtime) error {
}
// Prepare services.
apiConsumer := channelling.NewChannellingAPIConsumer()
buddyImages := channelling.NewImageCache()
codec := channelling.NewCodec(incomingCodecLimit)
roomManager := channelling.NewRoomManager(config, codec)
@ -265,9 +266,12 @@ func runner(runtime phoenix.Runtime) error { @@ -265,9 +266,12 @@ func runner(runtime phoenix.Runtime) error {
tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager)
// Create API.
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
apiConsumer.SetChannellingAPI(channellingAPI)
// Start bus.
busManager.Start()

Loading…
Cancel
Save