From 491d12be1cb69cdaa0186cd33630898abd602d2f Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Fri, 7 Oct 2016 15:05:35 +0200 Subject: [PATCH] Implement config loading via NATS On server startup the configuration files are loaded as normal. NATS is connected first and if enabled, additional configuration is loaded with a NATS request. This makes it possible to provide additional configuration from other services. NATS configuration is only loaded on startup. So whenever the NATS configuration provider changes the webrtc configuration, it has to trigger a restart of Spreed WebRTC externally. --- go/channelling/bus_manager.go | 65 ++++++++++++++++++++++------ go/natsconnection/natsconnection.go | 62 +++++++++++++++++++++++++- server.conf.in | 17 +++++++- src/app/spreed-webrtc-server/main.go | 60 +++++++++++++++++-------- 4 files changed, 170 insertions(+), 34 deletions(-) diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go index de78df7f..0558cbfd 100644 --- a/go/channelling/bus_manager.go +++ b/go/channelling/bus_manager.go @@ -49,6 +49,8 @@ type BusManager interface { Start() Publish(subject string, v interface{}) error Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error + BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error + BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) @@ -68,6 +70,14 @@ type BusTrigger struct { Pipeline string `json:",omitempty"` } +// A BusRequest is a simple generic to allow sending arbitrary +// Requests to the bus. +type BusRequest struct { + Id string + Type string + Data interface{} `json:",omitempty"` +} + // BusSubjectTrigger returns the bus subject for trigger payloads. func BusSubjectTrigger(prefix, suffix string) string { return fmt.Sprintf("%s.%s", prefix, suffix) @@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string { // NewBusManager creates and initializes a new BusMager with the // provided flags for NATS support. It is intended to connect the // backend bus with a easy to use API to send and receive bus data. -func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager { +func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, triggerSubjectPrefix string) BusManager { var b BusManager var err error if useNats { - b, err = newNatsBus(apiConsumer, id, subjectPrefix) + log.Println("NATS enabled", useNats, id) + b, err = newNatsBus(apiConsumer, id, triggerSubjectPrefix) if err == nil { log.Println("NATS bus connected") } else { @@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim return nil } +func (bus *noopBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { + v.Id = bus.id + return nil +} + +func (bus *noopBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { + v.Id = bus.id + return nil +} + func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error { return nil } @@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink { type natsBus struct { ChannellingAPIConsumer - id string - prefix string - ec *natsconnection.EncodedConnection - triggerQueue chan *busQueueEntry + id string + triggerPrefix string + ec *natsconnection.EncodedConnection + triggerQueue chan *busQueueEntry } -func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) { +func newNatsBus(apiConsumer ChannellingAPIConsumer, id, triggerPrefix string) (*natsBus, error) { ec, err := natsconnection.EstablishJSONEncodedConnection(nil) if err != nil { return nil, err } - if prefix == "" { - prefix = "channelling.trigger" - } // Create buffered channel for outbound NATS data. triggerQueue := make(chan *busQueueEntry, 50) - return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil + return &natsBus{apiConsumer, id, triggerPrefix, ec, triggerQueue}, nil } func (bus *natsBus) Start() { @@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim return bus.ec.Request(subject, v, vPtr, timeout) } +func (bus *natsBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { + v.Id = bus.id + return bus.Request(subject, v, vPtr, timeout) +} + +func (bus *natsBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { + if singleTimeout == nil { + singleTimeout = &natsconnection.DefaultRequestTimeout + } + v.Id = bus.id + err := natsconnection.CallFuncWithRetry(func() error { + return bus.Request(subject, v, vPtr, *singleTimeout) + }, timeout) + return err +} + func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) { + if bus.triggerPrefix == "" { + // Trigger disabled. + return nil + } + trigger := &BusTrigger{ Id: bus.id, Name: name, @@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli if pipeline != nil { trigger.Pipeline = pipeline.GetID() } - entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} + entry := &busQueueEntry{BusSubjectTrigger(bus.triggerPrefix, name), trigger} select { case bus.triggerQueue <- entry: // sent ok @@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli } func (bus *natsBus) PrefixSubject(sub string) string { - return fmt.Sprintf("%s.%s", bus.prefix, sub) + return fmt.Sprintf("%s.%s", bus.triggerPrefix, sub) } func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { diff --git a/go/natsconnection/natsconnection.go b/go/natsconnection/natsconnection.go index bf64bb7a..6c65990f 100644 --- a/go/natsconnection/natsconnection.go +++ b/go/natsconnection/natsconnection.go @@ -8,11 +8,14 @@ import ( "github.com/nats-io/nats" ) -// DefaultNatsEstablishTimeout is the default timeout for +// DefaultEstablishTimeout is the default timeout for // calls to EstablishNatsConnection. var DefaultEstablishTimeout = 60 * time.Second -// DefaultNatsURL is the default NATS server URL used for +// DefaultRequestTimeout is the default timeout for Request() calls. +var DefaultRequestTimeout = 5 * time.Second + +// DefaultURL is the default NATS server URL used for // calls to NewConnection and EstablishConnection. var DefaultURL = nats.DefaultURL @@ -138,3 +141,58 @@ func EstablishJSONEncodedConnection(timeout *time.Duration) (*EncodedConnection, } return &EncodedConnection{ec}, nil } + +// CallFuncWithRetry retries the given func when it does not return nil +// and the timeout duration has not been reached. It sleeps 1 second between +// each call. If the timeout is 0, the function will be retried forever. +func CallFuncWithRetry(f func() error, timeout time.Duration) error { + errch := make(chan error, 1) + quitch := make(chan bool) + var lastErr error + + // Start our worker loop. + go func() { + for { + select { + case <-quitch: + // Quit requested. + return + default: + // Call our target function. + err := f() + switch err { + case nil: + // No error, success. + errch <- err + return + default: + // Remember last error. + lastErr = err + } + time.Sleep(1 * time.Second) + } + } + }() + + // Create our wait channel. It either is an empty channel or + // it is filled when the timeout gets reached. + var waitch <-chan time.Time + if timeout == 0 { + // Create empty channel to wait forever. + waitch = make(<-chan time.Time) + } else { + waitch = time.After(timeout) + } + + // Wait until something happens, either nil result or timeout. + select { + case err := <-errch: + return err + case <-waitch: + quitch <- true + if lastErr != nil { + return lastErr + } + return errors.New("Call with retry: timeout") + } +} diff --git a/server.conf.in b/server.conf.in index 369f3e3c..04031156 100644 --- a/server.conf.in +++ b/server.conf.in @@ -203,16 +203,29 @@ enabled = false ;allowRegistration = false [nats] +; Set to true, to connect to NATS on startup. If false, all other settins in the +; [nats] section are ignored. Defaults to false. +;useNATS = false ; Set to true, to enable triggering channelling events via NATS ;channelling_trigger = false +; NATS channel trigger subject. Defaults to 'channelling.trigger'. ;channelling_trigger_subject = channelling.trigger -; NATS server URL +; NATS server URL. ;url = nats://127.0.0.1:4222 -; NATS connect establish timeout in seconds +; NATS connect establish timeout in seconds. ;establishTimeout = 60 ; Use client_id to distinguish between multipe servers. The value is sent ; together with every NATS request. Defaults to empty. ;client_id = +; Set to true, to load additional configuration settings via NATS. The config +; loaded is applied after loading the [nats] section and all entries extend the +; configuration files with overwrite. Only takes effect when a NATS server URL +; is configured. Defaults to false. +;configFromNATS = false +;configFromNATSSubject = spreed-webrtc.config.get +; NATS config load timeout. If 0, then it waits forever, blocking the startup +; until the configuration was loaded from NATS. Defaults to 0. +;configFromNATSTimeout = 0 [roomtypes] ; You can define room types that should be used for given room names instead of diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index 44b83dcd..435db53e 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -62,6 +62,49 @@ var config *channelling.Config func runner(runtime phoenix.Runtime) error { log.SetFlags(log.LstdFlags | log.Lmicroseconds) + // Nats pub/sub supports. + useNats := runtime.GetBoolDefault("nats", "useNATS", false) + if useNats { + if natsURL, err := runtime.GetString("nats", "url"); err == nil { + if natsURL != "" { + natsconnection.DefaultURL = natsURL + } + } + if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { + if natsEstablishTimeout != 0 { + natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second + } + } + } + natsClientId := runtime.GetStringDefault("nats", "client_id", "") + var natsChannellingTriggerSubject string + if runtime.GetBoolDefault("nats", "channelling_trigger", false) { + natsChannellingTriggerSubject = runtime.GetStringDefault("nats", "channelling_trigger_subject", "channelling.trigger") + } + + // Base services. + apiConsumer := channelling.NewChannellingAPIConsumer() + busManager := channelling.NewBusManager(apiConsumer, natsClientId, useNats, natsChannellingTriggerSubject) + + // Update configuration from NATS. + if useNats && runtime.GetBoolDefault("nats", "configFromNATS", false) { + log.Println("Fetching configuration from NATS") + configFromNATSSubject := runtime.GetStringDefault("nats", "configFromNATSSubject", "spreed-webrtc.config.get") + configFromNATSTimeout := time.Duration(runtime.GetIntDefault("nats", "configFromNATSTimeout", 0)) * time.Second + // Receive config from bus. + var req = &channelling.BusRequest{} + var res map[string]map[string]string + if err := busManager.BusRequestWithRetry(configFromNATSSubject, req, &res, configFromNATSTimeout, nil); err == nil { + if updateErr := runtime.Update(res); updateErr == nil { + log.Println("Updated configuration from NATS") + } else { + log.Println("Failed to update config with NATS updates", updateErr) + } + } else { + log.Println("Failed to fetch config from NATS", err) + } + } + rootFolder, err := runtime.GetString("http", "root") if err != nil { cwd, err2 := os.Getwd() @@ -158,21 +201,6 @@ func runner(runtime phoenix.Runtime) error { tokenProvider = channelling.TokenFileProvider(tokenFile) } - // Nats pub/sub supports. - natsChannellingTrigger, _ := runtime.GetBool("nats", "channelling_trigger") - natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject") - if natsURL, err := runtime.GetString("nats", "url"); err == nil { - if natsURL != "" { - natsconnection.DefaultURL = natsURL - } - } - if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { - if natsEstablishTimeout != 0 { - natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second - } - } - natsClientId, _ := runtime.GetString("nats", "client_id") - // Load remaining configuration items. config, err = server.NewConfig(runtime, tokenProvider != nil) if err != nil { @@ -282,7 +310,6 @@ func runner(runtime phoenix.Runtime) error { } // Prepare services. - apiConsumer := channelling.NewChannellingAPIConsumer() buddyImages := channelling.NewImageCache() codec := channelling.NewCodec(incomingCodecLimit) roomManager := channelling.NewRoomManager(config, codec) @@ -290,7 +317,6 @@ func runner(runtime phoenix.Runtime) error { tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) - busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) if err := roomManager.SetBusManager(busManager); err != nil { return err