From 491d12be1cb69cdaa0186cd33630898abd602d2f Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Fri, 7 Oct 2016 15:05:35 +0200 Subject: [PATCH 1/2] Implement config loading via NATS On server startup the configuration files are loaded as normal. NATS is connected first and if enabled, additional configuration is loaded with a NATS request. This makes it possible to provide additional configuration from other services. NATS configuration is only loaded on startup. So whenever the NATS configuration provider changes the webrtc configuration, it has to trigger a restart of Spreed WebRTC externally. --- go/channelling/bus_manager.go | 65 ++++++++++++++++++++++------ go/natsconnection/natsconnection.go | 62 +++++++++++++++++++++++++- server.conf.in | 17 +++++++- src/app/spreed-webrtc-server/main.go | 60 +++++++++++++++++-------- 4 files changed, 170 insertions(+), 34 deletions(-) diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go index de78df7f..0558cbfd 100644 --- a/go/channelling/bus_manager.go +++ b/go/channelling/bus_manager.go @@ -49,6 +49,8 @@ type BusManager interface { Start() Publish(subject string, v interface{}) error Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error + BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error + BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) @@ -68,6 +70,14 @@ type BusTrigger struct { Pipeline string `json:",omitempty"` } +// A BusRequest is a simple generic to allow sending arbitrary +// Requests to the bus. +type BusRequest struct { + Id string + Type string + Data interface{} `json:",omitempty"` +} + // BusSubjectTrigger returns the bus subject for trigger payloads. func BusSubjectTrigger(prefix, suffix string) string { return fmt.Sprintf("%s.%s", prefix, suffix) @@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string { // NewBusManager creates and initializes a new BusMager with the // provided flags for NATS support. It is intended to connect the // backend bus with a easy to use API to send and receive bus data. -func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager { +func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, triggerSubjectPrefix string) BusManager { var b BusManager var err error if useNats { - b, err = newNatsBus(apiConsumer, id, subjectPrefix) + log.Println("NATS enabled", useNats, id) + b, err = newNatsBus(apiConsumer, id, triggerSubjectPrefix) if err == nil { log.Println("NATS bus connected") } else { @@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim return nil } +func (bus *noopBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { + v.Id = bus.id + return nil +} + +func (bus *noopBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { + v.Id = bus.id + return nil +} + func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error { return nil } @@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink { type natsBus struct { ChannellingAPIConsumer - id string - prefix string - ec *natsconnection.EncodedConnection - triggerQueue chan *busQueueEntry + id string + triggerPrefix string + ec *natsconnection.EncodedConnection + triggerQueue chan *busQueueEntry } -func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) { +func newNatsBus(apiConsumer ChannellingAPIConsumer, id, triggerPrefix string) (*natsBus, error) { ec, err := natsconnection.EstablishJSONEncodedConnection(nil) if err != nil { return nil, err } - if prefix == "" { - prefix = "channelling.trigger" - } // Create buffered channel for outbound NATS data. triggerQueue := make(chan *busQueueEntry, 50) - return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil + return &natsBus{apiConsumer, id, triggerPrefix, ec, triggerQueue}, nil } func (bus *natsBus) Start() { @@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim return bus.ec.Request(subject, v, vPtr, timeout) } +func (bus *natsBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { + v.Id = bus.id + return bus.Request(subject, v, vPtr, timeout) +} + +func (bus *natsBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { + if singleTimeout == nil { + singleTimeout = &natsconnection.DefaultRequestTimeout + } + v.Id = bus.id + err := natsconnection.CallFuncWithRetry(func() error { + return bus.Request(subject, v, vPtr, *singleTimeout) + }, timeout) + return err +} + func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) { + if bus.triggerPrefix == "" { + // Trigger disabled. + return nil + } + trigger := &BusTrigger{ Id: bus.id, Name: name, @@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli if pipeline != nil { trigger.Pipeline = pipeline.GetID() } - entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} + entry := &busQueueEntry{BusSubjectTrigger(bus.triggerPrefix, name), trigger} select { case bus.triggerQueue <- entry: // sent ok @@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli } func (bus *natsBus) PrefixSubject(sub string) string { - return fmt.Sprintf("%s.%s", bus.prefix, sub) + return fmt.Sprintf("%s.%s", bus.triggerPrefix, sub) } func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { diff --git a/go/natsconnection/natsconnection.go b/go/natsconnection/natsconnection.go index bf64bb7a..6c65990f 100644 --- a/go/natsconnection/natsconnection.go +++ b/go/natsconnection/natsconnection.go @@ -8,11 +8,14 @@ import ( "github.com/nats-io/nats" ) -// DefaultNatsEstablishTimeout is the default timeout for +// DefaultEstablishTimeout is the default timeout for // calls to EstablishNatsConnection. var DefaultEstablishTimeout = 60 * time.Second -// DefaultNatsURL is the default NATS server URL used for +// DefaultRequestTimeout is the default timeout for Request() calls. +var DefaultRequestTimeout = 5 * time.Second + +// DefaultURL is the default NATS server URL used for // calls to NewConnection and EstablishConnection. var DefaultURL = nats.DefaultURL @@ -138,3 +141,58 @@ func EstablishJSONEncodedConnection(timeout *time.Duration) (*EncodedConnection, } return &EncodedConnection{ec}, nil } + +// CallFuncWithRetry retries the given func when it does not return nil +// and the timeout duration has not been reached. It sleeps 1 second between +// each call. If the timeout is 0, the function will be retried forever. +func CallFuncWithRetry(f func() error, timeout time.Duration) error { + errch := make(chan error, 1) + quitch := make(chan bool) + var lastErr error + + // Start our worker loop. + go func() { + for { + select { + case <-quitch: + // Quit requested. + return + default: + // Call our target function. + err := f() + switch err { + case nil: + // No error, success. + errch <- err + return + default: + // Remember last error. + lastErr = err + } + time.Sleep(1 * time.Second) + } + } + }() + + // Create our wait channel. It either is an empty channel or + // it is filled when the timeout gets reached. + var waitch <-chan time.Time + if timeout == 0 { + // Create empty channel to wait forever. + waitch = make(<-chan time.Time) + } else { + waitch = time.After(timeout) + } + + // Wait until something happens, either nil result or timeout. + select { + case err := <-errch: + return err + case <-waitch: + quitch <- true + if lastErr != nil { + return lastErr + } + return errors.New("Call with retry: timeout") + } +} diff --git a/server.conf.in b/server.conf.in index 369f3e3c..04031156 100644 --- a/server.conf.in +++ b/server.conf.in @@ -203,16 +203,29 @@ enabled = false ;allowRegistration = false [nats] +; Set to true, to connect to NATS on startup. If false, all other settins in the +; [nats] section are ignored. Defaults to false. +;useNATS = false ; Set to true, to enable triggering channelling events via NATS ;channelling_trigger = false +; NATS channel trigger subject. Defaults to 'channelling.trigger'. ;channelling_trigger_subject = channelling.trigger -; NATS server URL +; NATS server URL. ;url = nats://127.0.0.1:4222 -; NATS connect establish timeout in seconds +; NATS connect establish timeout in seconds. ;establishTimeout = 60 ; Use client_id to distinguish between multipe servers. The value is sent ; together with every NATS request. Defaults to empty. ;client_id = +; Set to true, to load additional configuration settings via NATS. The config +; loaded is applied after loading the [nats] section and all entries extend the +; configuration files with overwrite. Only takes effect when a NATS server URL +; is configured. Defaults to false. +;configFromNATS = false +;configFromNATSSubject = spreed-webrtc.config.get +; NATS config load timeout. If 0, then it waits forever, blocking the startup +; until the configuration was loaded from NATS. Defaults to 0. +;configFromNATSTimeout = 0 [roomtypes] ; You can define room types that should be used for given room names instead of diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index 44b83dcd..435db53e 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -62,6 +62,49 @@ var config *channelling.Config func runner(runtime phoenix.Runtime) error { log.SetFlags(log.LstdFlags | log.Lmicroseconds) + // Nats pub/sub supports. + useNats := runtime.GetBoolDefault("nats", "useNATS", false) + if useNats { + if natsURL, err := runtime.GetString("nats", "url"); err == nil { + if natsURL != "" { + natsconnection.DefaultURL = natsURL + } + } + if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { + if natsEstablishTimeout != 0 { + natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second + } + } + } + natsClientId := runtime.GetStringDefault("nats", "client_id", "") + var natsChannellingTriggerSubject string + if runtime.GetBoolDefault("nats", "channelling_trigger", false) { + natsChannellingTriggerSubject = runtime.GetStringDefault("nats", "channelling_trigger_subject", "channelling.trigger") + } + + // Base services. + apiConsumer := channelling.NewChannellingAPIConsumer() + busManager := channelling.NewBusManager(apiConsumer, natsClientId, useNats, natsChannellingTriggerSubject) + + // Update configuration from NATS. + if useNats && runtime.GetBoolDefault("nats", "configFromNATS", false) { + log.Println("Fetching configuration from NATS") + configFromNATSSubject := runtime.GetStringDefault("nats", "configFromNATSSubject", "spreed-webrtc.config.get") + configFromNATSTimeout := time.Duration(runtime.GetIntDefault("nats", "configFromNATSTimeout", 0)) * time.Second + // Receive config from bus. + var req = &channelling.BusRequest{} + var res map[string]map[string]string + if err := busManager.BusRequestWithRetry(configFromNATSSubject, req, &res, configFromNATSTimeout, nil); err == nil { + if updateErr := runtime.Update(res); updateErr == nil { + log.Println("Updated configuration from NATS") + } else { + log.Println("Failed to update config with NATS updates", updateErr) + } + } else { + log.Println("Failed to fetch config from NATS", err) + } + } + rootFolder, err := runtime.GetString("http", "root") if err != nil { cwd, err2 := os.Getwd() @@ -158,21 +201,6 @@ func runner(runtime phoenix.Runtime) error { tokenProvider = channelling.TokenFileProvider(tokenFile) } - // Nats pub/sub supports. - natsChannellingTrigger, _ := runtime.GetBool("nats", "channelling_trigger") - natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject") - if natsURL, err := runtime.GetString("nats", "url"); err == nil { - if natsURL != "" { - natsconnection.DefaultURL = natsURL - } - } - if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { - if natsEstablishTimeout != 0 { - natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second - } - } - natsClientId, _ := runtime.GetString("nats", "client_id") - // Load remaining configuration items. config, err = server.NewConfig(runtime, tokenProvider != nil) if err != nil { @@ -282,7 +310,6 @@ func runner(runtime phoenix.Runtime) error { } // Prepare services. - apiConsumer := channelling.NewChannellingAPIConsumer() buddyImages := channelling.NewImageCache() codec := channelling.NewCodec(incomingCodecLimit) roomManager := channelling.NewRoomManager(config, codec) @@ -290,7 +317,6 @@ func runner(runtime phoenix.Runtime) error { tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) - busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) if err := roomManager.SetBusManager(busManager); err != nil { return err From 9cebfe48209db8f4ba3de3968451fdd5073f2a67 Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Fri, 7 Oct 2016 15:27:23 +0200 Subject: [PATCH 2/2] Update dependencies to phoenix with Update support --- dependencies.tsv | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies.tsv b/dependencies.tsv index 651d744d..99dcf131 100644 --- a/dependencies.tsv +++ b/dependencies.tsv @@ -8,6 +8,6 @@ github.com/nats-io/nats git 355b5b97e0842dc94f1106729aa88e33e06317ca 2015-12-09T github.com/satori/go.uuid git 879c5887cd475cd7864858769793b2ceb0d44feb 2016-06-07T14:43:47Z github.com/strukturag/goacceptlanguageparser git 68066e68c2940059aadc6e19661610cf428b6647 2014-02-13T13:31:23Z github.com/strukturag/httputils git afbf05c71ac03ee7989c96d033a9571ba4ded468 2014-07-02T01:35:33Z -github.com/strukturag/phoenix git 31b7f25f4815e6e0b8e7c4010f6e9a71c4165b19 2016-06-01T11:34:58Z +github.com/strukturag/phoenix git 8c65e1692d19e1ea84c79d8346bee5747c8ef69c 2016-10-05T15:12:02Z github.com/strukturag/sloth git 74a8bcf67368de59baafe5d3e17aee9875564cfc 2015-04-22T08:59:42Z github.com/strukturag/spreed-turnservicecli git dcbf3f8eab1c36d4d0e4c6f9b6dce4f060543224 2016-08-29T09:55:47Z