Browse Source

Added startup bus event and a NATS client id.

pull/266/head
Simon Eisenmann 10 years ago
parent
commit
d782eb26d8
  1. 5
      server.conf.in
  2. 23
      src/app/spreed-webrtc-server/bus_manager.go
  3. 3
      src/app/spreed-webrtc-server/main.go

5
server.conf.in

@ -202,4 +202,7 @@ enabled = false @@ -202,4 +202,7 @@ enabled = false
; NATS server URL
;url = nats://127.0.0.1:4222
; NATS connect establish timeout in seconds
;establishTimeout = 60
;establishTimeout = 60
; Use client_id to distinguish between multipe servers. The value is sent
; together with every NATS request. Defaults to empty.
;client_id =

23
src/app/spreed-webrtc-server/bus_manager.go

@ -29,6 +29,7 @@ import ( @@ -29,6 +29,7 @@ import (
)
const (
BusManagerStartup = "startup"
BusManagerOffer = "offer"
BusManagerAnswer = "answer"
BusManagerBye = "bye"
@ -45,6 +46,7 @@ type BusManager interface { @@ -45,6 +46,7 @@ type BusManager interface {
// A BusTrigger is a container to serialize trigger events
// for the bus backend.
type BusTrigger struct {
Id string
Name string
From string
Payload string `json:",omitempty"`
@ -63,24 +65,29 @@ type busManager struct { @@ -63,24 +65,29 @@ type busManager struct {
// NewBusManager creates and initializes a new BusMager with the
// provided flags for NATS support. It is intended to connect the
// backend bus with a easy to use API to send and receive bus data.
func NewBusManager(useNats bool, subjectPrefix string) BusManager {
func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager {
var b BusManager
var err error
if useNats {
var err error
b, err = newNatsBus(subjectPrefix)
b, err = newNatsBus(id, subjectPrefix)
if err == nil {
log.Println("Nats bus connected")
} else {
log.Println("Error connecting nats bus", err)
b = &noopBus{}
b = &noopBus{id}
}
} else {
b = &noopBus{}
b = &noopBus{id}
}
if err == nil {
b.Trigger(BusManagerStartup, id, "", nil)
}
return &busManager{b}
}
type noopBus struct {
id string
}
func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error {
@ -88,11 +95,12 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error @@ -88,11 +95,12 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error
}
type natsBus struct {
id string
prefix string
ec *nats.EncodedConn
}
func newNatsBus(prefix string) (*natsBus, error) {
func newNatsBus(id, prefix string) (*natsBus, error) {
ec, err := EstablishNatsConnection(nil)
if err != nil {
return nil, err
@ -100,12 +108,13 @@ func newNatsBus(prefix string) (*natsBus, error) { @@ -100,12 +108,13 @@ func newNatsBus(prefix string) (*natsBus, error) {
if prefix == "" {
prefix = "channelling.trigger"
}
return &natsBus{prefix, ec}, nil
return &natsBus{id, prefix, ec}, nil
}
func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) {
if bus.ec != nil {
trigger := &BusTrigger{
Id: bus.id,
Name: name,
From: from,
Payload: payload,

3
src/app/spreed-webrtc-server/main.go

@ -319,6 +319,7 @@ func runner(runtime phoenix.Runtime) error { @@ -319,6 +319,7 @@ func runner(runtime phoenix.Runtime) error {
DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second
}
}
natsClientId, _ := runtime.GetString("nats", "client_id")
// Load remaining configuration items.
config = NewConfig(runtime, tokenProvider != nil)
@ -421,7 +422,7 @@ func runner(runtime phoenix.Runtime) error { @@ -421,7 +422,7 @@ func runner(runtime phoenix.Runtime) error {
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager)
busManager := NewBusManager(natsChannellingTrigger, natsChannellingTriggerSubject)
busManager := NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager)
// Add handlers.

Loading…
Cancel
Save