diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go index de78df7f..0558cbfd 100644 --- a/go/channelling/bus_manager.go +++ b/go/channelling/bus_manager.go @@ -49,6 +49,8 @@ type BusManager interface { Start() Publish(subject string, v interface{}) error Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error + BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error + BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) @@ -68,6 +70,14 @@ type BusTrigger struct { Pipeline string `json:",omitempty"` } +// A BusRequest is a simple generic to allow sending arbitrary +// Requests to the bus. +type BusRequest struct { + Id string + Type string + Data interface{} `json:",omitempty"` +} + // BusSubjectTrigger returns the bus subject for trigger payloads. func BusSubjectTrigger(prefix, suffix string) string { return fmt.Sprintf("%s.%s", prefix, suffix) @@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string { // NewBusManager creates and initializes a new BusMager with the // provided flags for NATS support. It is intended to connect the // backend bus with a easy to use API to send and receive bus data. -func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager { +func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, triggerSubjectPrefix string) BusManager { var b BusManager var err error if useNats { - b, err = newNatsBus(apiConsumer, id, subjectPrefix) + log.Println("NATS enabled", useNats, id) + b, err = newNatsBus(apiConsumer, id, triggerSubjectPrefix) if err == nil { log.Println("NATS bus connected") } else { @@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim return nil } +func (bus *noopBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { + v.Id = bus.id + return nil +} + +func (bus *noopBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { + v.Id = bus.id + return nil +} + func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error { return nil } @@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink { type natsBus struct { ChannellingAPIConsumer - id string - prefix string - ec *natsconnection.EncodedConnection - triggerQueue chan *busQueueEntry + id string + triggerPrefix string + ec *natsconnection.EncodedConnection + triggerQueue chan *busQueueEntry } -func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) { +func newNatsBus(apiConsumer ChannellingAPIConsumer, id, triggerPrefix string) (*natsBus, error) { ec, err := natsconnection.EstablishJSONEncodedConnection(nil) if err != nil { return nil, err } - if prefix == "" { - prefix = "channelling.trigger" - } // Create buffered channel for outbound NATS data. triggerQueue := make(chan *busQueueEntry, 50) - return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil + return &natsBus{apiConsumer, id, triggerPrefix, ec, triggerQueue}, nil } func (bus *natsBus) Start() { @@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim return bus.ec.Request(subject, v, vPtr, timeout) } +func (bus *natsBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { + v.Id = bus.id + return bus.Request(subject, v, vPtr, timeout) +} + +func (bus *natsBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { + if singleTimeout == nil { + singleTimeout = &natsconnection.DefaultRequestTimeout + } + v.Id = bus.id + err := natsconnection.CallFuncWithRetry(func() error { + return bus.Request(subject, v, vPtr, *singleTimeout) + }, timeout) + return err +} + func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) { + if bus.triggerPrefix == "" { + // Trigger disabled. + return nil + } + trigger := &BusTrigger{ Id: bus.id, Name: name, @@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli if pipeline != nil { trigger.Pipeline = pipeline.GetID() } - entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} + entry := &busQueueEntry{BusSubjectTrigger(bus.triggerPrefix, name), trigger} select { case bus.triggerQueue <- entry: // sent ok @@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli } func (bus *natsBus) PrefixSubject(sub string) string { - return fmt.Sprintf("%s.%s", bus.prefix, sub) + return fmt.Sprintf("%s.%s", bus.triggerPrefix, sub) } func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { diff --git a/go/natsconnection/natsconnection.go b/go/natsconnection/natsconnection.go index bf64bb7a..6c65990f 100644 --- a/go/natsconnection/natsconnection.go +++ b/go/natsconnection/natsconnection.go @@ -8,11 +8,14 @@ import ( "github.com/nats-io/nats" ) -// DefaultNatsEstablishTimeout is the default timeout for +// DefaultEstablishTimeout is the default timeout for // calls to EstablishNatsConnection. var DefaultEstablishTimeout = 60 * time.Second -// DefaultNatsURL is the default NATS server URL used for +// DefaultRequestTimeout is the default timeout for Request() calls. +var DefaultRequestTimeout = 5 * time.Second + +// DefaultURL is the default NATS server URL used for // calls to NewConnection and EstablishConnection. var DefaultURL = nats.DefaultURL @@ -138,3 +141,58 @@ func EstablishJSONEncodedConnection(timeout *time.Duration) (*EncodedConnection, } return &EncodedConnection{ec}, nil } + +// CallFuncWithRetry retries the given func when it does not return nil +// and the timeout duration has not been reached. It sleeps 1 second between +// each call. If the timeout is 0, the function will be retried forever. +func CallFuncWithRetry(f func() error, timeout time.Duration) error { + errch := make(chan error, 1) + quitch := make(chan bool) + var lastErr error + + // Start our worker loop. + go func() { + for { + select { + case <-quitch: + // Quit requested. + return + default: + // Call our target function. + err := f() + switch err { + case nil: + // No error, success. + errch <- err + return + default: + // Remember last error. + lastErr = err + } + time.Sleep(1 * time.Second) + } + } + }() + + // Create our wait channel. It either is an empty channel or + // it is filled when the timeout gets reached. + var waitch <-chan time.Time + if timeout == 0 { + // Create empty channel to wait forever. + waitch = make(<-chan time.Time) + } else { + waitch = time.After(timeout) + } + + // Wait until something happens, either nil result or timeout. + select { + case err := <-errch: + return err + case <-waitch: + quitch <- true + if lastErr != nil { + return lastErr + } + return errors.New("Call with retry: timeout") + } +} diff --git a/server.conf.in b/server.conf.in index 369f3e3c..04031156 100644 --- a/server.conf.in +++ b/server.conf.in @@ -203,16 +203,29 @@ enabled = false ;allowRegistration = false [nats] +; Set to true, to connect to NATS on startup. If false, all other settins in the +; [nats] section are ignored. Defaults to false. +;useNATS = false ; Set to true, to enable triggering channelling events via NATS ;channelling_trigger = false +; NATS channel trigger subject. Defaults to 'channelling.trigger'. ;channelling_trigger_subject = channelling.trigger -; NATS server URL +; NATS server URL. ;url = nats://127.0.0.1:4222 -; NATS connect establish timeout in seconds +; NATS connect establish timeout in seconds. ;establishTimeout = 60 ; Use client_id to distinguish between multipe servers. The value is sent ; together with every NATS request. Defaults to empty. ;client_id = +; Set to true, to load additional configuration settings via NATS. The config +; loaded is applied after loading the [nats] section and all entries extend the +; configuration files with overwrite. Only takes effect when a NATS server URL +; is configured. Defaults to false. +;configFromNATS = false +;configFromNATSSubject = spreed-webrtc.config.get +; NATS config load timeout. If 0, then it waits forever, blocking the startup +; until the configuration was loaded from NATS. Defaults to 0. +;configFromNATSTimeout = 0 [roomtypes] ; You can define room types that should be used for given room names instead of diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index 44b83dcd..435db53e 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -62,6 +62,49 @@ var config *channelling.Config func runner(runtime phoenix.Runtime) error { log.SetFlags(log.LstdFlags | log.Lmicroseconds) + // Nats pub/sub supports. + useNats := runtime.GetBoolDefault("nats", "useNATS", false) + if useNats { + if natsURL, err := runtime.GetString("nats", "url"); err == nil { + if natsURL != "" { + natsconnection.DefaultURL = natsURL + } + } + if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { + if natsEstablishTimeout != 0 { + natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second + } + } + } + natsClientId := runtime.GetStringDefault("nats", "client_id", "") + var natsChannellingTriggerSubject string + if runtime.GetBoolDefault("nats", "channelling_trigger", false) { + natsChannellingTriggerSubject = runtime.GetStringDefault("nats", "channelling_trigger_subject", "channelling.trigger") + } + + // Base services. + apiConsumer := channelling.NewChannellingAPIConsumer() + busManager := channelling.NewBusManager(apiConsumer, natsClientId, useNats, natsChannellingTriggerSubject) + + // Update configuration from NATS. + if useNats && runtime.GetBoolDefault("nats", "configFromNATS", false) { + log.Println("Fetching configuration from NATS") + configFromNATSSubject := runtime.GetStringDefault("nats", "configFromNATSSubject", "spreed-webrtc.config.get") + configFromNATSTimeout := time.Duration(runtime.GetIntDefault("nats", "configFromNATSTimeout", 0)) * time.Second + // Receive config from bus. + var req = &channelling.BusRequest{} + var res map[string]map[string]string + if err := busManager.BusRequestWithRetry(configFromNATSSubject, req, &res, configFromNATSTimeout, nil); err == nil { + if updateErr := runtime.Update(res); updateErr == nil { + log.Println("Updated configuration from NATS") + } else { + log.Println("Failed to update config with NATS updates", updateErr) + } + } else { + log.Println("Failed to fetch config from NATS", err) + } + } + rootFolder, err := runtime.GetString("http", "root") if err != nil { cwd, err2 := os.Getwd() @@ -158,21 +201,6 @@ func runner(runtime phoenix.Runtime) error { tokenProvider = channelling.TokenFileProvider(tokenFile) } - // Nats pub/sub supports. - natsChannellingTrigger, _ := runtime.GetBool("nats", "channelling_trigger") - natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject") - if natsURL, err := runtime.GetString("nats", "url"); err == nil { - if natsURL != "" { - natsconnection.DefaultURL = natsURL - } - } - if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { - if natsEstablishTimeout != 0 { - natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second - } - } - natsClientId, _ := runtime.GetString("nats", "client_id") - // Load remaining configuration items. config, err = server.NewConfig(runtime, tokenProvider != nil) if err != nil { @@ -282,7 +310,6 @@ func runner(runtime phoenix.Runtime) error { } // Prepare services. - apiConsumer := channelling.NewChannellingAPIConsumer() buddyImages := channelling.NewImageCache() codec := channelling.NewCodec(incomingCodecLimit) roomManager := channelling.NewRoomManager(config, codec) @@ -290,7 +317,6 @@ func runner(runtime phoenix.Runtime) error { tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) - busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) if err := roomManager.SetBusManager(busManager); err != nil { return err