From 336879b81884538838292d0e97ba3e8e4a8f8ade Mon Sep 17 00:00:00 2001 From: Simon Eisenmann <simon@struktur.de> Date: Wed, 2 Mar 2016 09:58:24 +0100 Subject: [PATCH] Trigger NATS events non blocking through buffered channel. --- src/app/spreed-webrtc-server/bus_manager.go | 39 +++++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/src/app/spreed-webrtc-server/bus_manager.go b/src/app/spreed-webrtc-server/bus_manager.go index 87b809e4..1f70b7c8 100644 --- a/src/app/spreed-webrtc-server/bus_manager.go +++ b/src/app/spreed-webrtc-server/bus_manager.go @@ -22,6 +22,7 @@ package main import ( + "errors" "fmt" "log" @@ -95,9 +96,10 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error } type natsBus struct { - id string - prefix string - ec *nats.EncodedConn + id string + prefix string + ec *nats.EncodedConn + triggerQueue chan *busQueueEntry } func newNatsBus(id, prefix string) (*natsBus, error) { @@ -108,7 +110,11 @@ func newNatsBus(id, prefix string) (*natsBus, error) { if prefix == "" { prefix = "channelling.trigger" } - return &natsBus{id, prefix, ec}, nil + // Create buffered channel for outbound NATS data. + triggerQueue := make(chan *busQueueEntry, 50) + // Start go routine to process outbount NATS publishing. + go chPublish(ec, triggerQueue) + return &natsBus{id, prefix, ec, triggerQueue}, nil } func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { @@ -120,10 +126,29 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err e Payload: payload, Data: data, } - err = bus.ec.Publish(BusSubjectTrigger(bus.prefix, name), trigger) - if err != nil { - log.Println("Failed to trigger NATS event", err) + entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} + select { + case bus.triggerQueue <- entry: + // sent ok + default: + log.Println("Failed to queue NATS event - queue full?") + err = errors.New("NATS trigger queue full") } } return err } + +type busQueueEntry struct { + subject string + data interface{} +} + +func chPublish(ec *nats.EncodedConn, channel chan (*busQueueEntry)) { + for { + entry := <-channel + err := ec.Publish(entry.subject, entry.data) + if err != nil { + log.Println("Failed to publish to NATS", entry.subject, err) + } + } +}