From d782eb26d845c924fe5e1e22630bdd17dd71d8c5 Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Tue, 1 Mar 2016 10:36:01 +0100 Subject: [PATCH] Added startup bus event and a NATS client id. --- server.conf.in | 5 ++++- src/app/spreed-webrtc-server/bus_manager.go | 23 ++++++++++++++------- src/app/spreed-webrtc-server/main.go | 3 ++- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/server.conf.in b/server.conf.in index b9727df6..bbab48d2 100644 --- a/server.conf.in +++ b/server.conf.in @@ -202,4 +202,7 @@ enabled = false ; NATS server URL ;url = nats://127.0.0.1:4222 ; NATS connect establish timeout in seconds -;establishTimeout = 60 \ No newline at end of file +;establishTimeout = 60 +; Use client_id to distinguish between multipe servers. The value is sent +; together with every NATS request. Defaults to empty. +;client_id = diff --git a/src/app/spreed-webrtc-server/bus_manager.go b/src/app/spreed-webrtc-server/bus_manager.go index b7bc2067..87b809e4 100644 --- a/src/app/spreed-webrtc-server/bus_manager.go +++ b/src/app/spreed-webrtc-server/bus_manager.go @@ -29,6 +29,7 @@ import ( ) const ( + BusManagerStartup = "startup" BusManagerOffer = "offer" BusManagerAnswer = "answer" BusManagerBye = "bye" @@ -45,6 +46,7 @@ type BusManager interface { // A BusTrigger is a container to serialize trigger events // for the bus backend. type BusTrigger struct { + Id string Name string From string Payload string `json:",omitempty"` @@ -63,24 +65,29 @@ type busManager struct { // NewBusManager creates and initializes a new BusMager with the // provided flags for NATS support. It is intended to connect the // backend bus with a easy to use API to send and receive bus data. -func NewBusManager(useNats bool, subjectPrefix string) BusManager { +func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager { var b BusManager + var err error if useNats { - var err error - b, err = newNatsBus(subjectPrefix) + b, err = newNatsBus(id, subjectPrefix) if err == nil { log.Println("Nats bus connected") } else { log.Println("Error connecting nats bus", err) - b = &noopBus{} + b = &noopBus{id} } } else { - b = &noopBus{} + b = &noopBus{id} } + if err == nil { + b.Trigger(BusManagerStartup, id, "", nil) + } + return &busManager{b} } type noopBus struct { + id string } func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error { @@ -88,11 +95,12 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error } type natsBus struct { + id string prefix string ec *nats.EncodedConn } -func newNatsBus(prefix string) (*natsBus, error) { +func newNatsBus(id, prefix string) (*natsBus, error) { ec, err := EstablishNatsConnection(nil) if err != nil { return nil, err @@ -100,12 +108,13 @@ func newNatsBus(prefix string) (*natsBus, error) { if prefix == "" { prefix = "channelling.trigger" } - return &natsBus{prefix, ec}, nil + return &natsBus{id, prefix, ec}, nil } func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { if bus.ec != nil { trigger := &BusTrigger{ + Id: bus.id, Name: name, From: from, Payload: payload, diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index e32a0f0c..181db45f 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -319,6 +319,7 @@ func runner(runtime phoenix.Runtime) error { DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second } } + natsClientId, _ := runtime.GetString("nats", "client_id") // Load remaining configuration items. config = NewConfig(runtime, tokenProvider != nil) @@ -421,7 +422,7 @@ func runner(runtime phoenix.Runtime) error { tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := NewStatsManager(hub, roomManager, sessionManager) - busManager := NewBusManager(natsChannellingTrigger, natsChannellingTriggerSubject) + busManager := NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager) // Add handlers.