|
|
|
@ -22,6 +22,7 @@
@@ -22,6 +22,7 @@
|
|
|
|
|
package main |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"log" |
|
|
|
|
|
|
|
|
@ -95,9 +96,10 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error
@@ -95,9 +96,10 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type natsBus struct { |
|
|
|
|
id string |
|
|
|
|
prefix string |
|
|
|
|
ec *nats.EncodedConn |
|
|
|
|
id string |
|
|
|
|
prefix string |
|
|
|
|
ec *nats.EncodedConn |
|
|
|
|
triggerQueue chan *busQueueEntry |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newNatsBus(id, prefix string) (*natsBus, error) { |
|
|
|
@ -108,7 +110,11 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
@@ -108,7 +110,11 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
|
|
|
|
|
if prefix == "" { |
|
|
|
|
prefix = "channelling.trigger" |
|
|
|
|
} |
|
|
|
|
return &natsBus{id, prefix, ec}, nil |
|
|
|
|
// Create buffered channel for outbound NATS data.
|
|
|
|
|
triggerQueue := make(chan *busQueueEntry, 50) |
|
|
|
|
// Start go routine to process outbount NATS publishing.
|
|
|
|
|
go chPublish(ec, triggerQueue) |
|
|
|
|
return &natsBus{id, prefix, ec, triggerQueue}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { |
|
|
|
@ -120,10 +126,29 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err e
@@ -120,10 +126,29 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err e
|
|
|
|
|
Payload: payload, |
|
|
|
|
Data: data, |
|
|
|
|
} |
|
|
|
|
err = bus.ec.Publish(BusSubjectTrigger(bus.prefix, name), trigger) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Println("Failed to trigger NATS event", err) |
|
|
|
|
entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} |
|
|
|
|
select { |
|
|
|
|
case bus.triggerQueue <- entry: |
|
|
|
|
// sent ok
|
|
|
|
|
default: |
|
|
|
|
log.Println("Failed to queue NATS event - queue full?") |
|
|
|
|
err = errors.New("NATS trigger queue full") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type busQueueEntry struct { |
|
|
|
|
subject string |
|
|
|
|
data interface{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func chPublish(ec *nats.EncodedConn, channel chan (*busQueueEntry)) { |
|
|
|
|
for { |
|
|
|
|
entry := <-channel |
|
|
|
|
err := ec.Publish(entry.subject, entry.data) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Println("Failed to publish to NATS", entry.subject, err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|