|
|
|
|
@ -49,6 +49,8 @@ type BusManager interface {
@@ -49,6 +49,8 @@ type BusManager interface {
|
|
|
|
|
Start() |
|
|
|
|
Publish(subject string, v interface{}) error |
|
|
|
|
Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error |
|
|
|
|
BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error |
|
|
|
|
BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error |
|
|
|
|
Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error |
|
|
|
|
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) |
|
|
|
|
BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) |
|
|
|
|
@ -68,6 +70,14 @@ type BusTrigger struct {
@@ -68,6 +70,14 @@ type BusTrigger struct {
|
|
|
|
|
Pipeline string `json:",omitempty"` |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// A BusRequest is a simple generic to allow sending arbitrary
|
|
|
|
|
// Requests to the bus.
|
|
|
|
|
type BusRequest struct { |
|
|
|
|
Id string |
|
|
|
|
Type string |
|
|
|
|
Data interface{} `json:",omitempty"` |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// BusSubjectTrigger returns the bus subject for trigger payloads.
|
|
|
|
|
func BusSubjectTrigger(prefix, suffix string) string { |
|
|
|
|
return fmt.Sprintf("%s.%s", prefix, suffix) |
|
|
|
|
@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string {
@@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string {
|
|
|
|
|
// NewBusManager creates and initializes a new BusMager with the
|
|
|
|
|
// provided flags for NATS support. It is intended to connect the
|
|
|
|
|
// backend bus with a easy to use API to send and receive bus data.
|
|
|
|
|
func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager { |
|
|
|
|
func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, triggerSubjectPrefix string) BusManager { |
|
|
|
|
var b BusManager |
|
|
|
|
var err error |
|
|
|
|
if useNats { |
|
|
|
|
b, err = newNatsBus(apiConsumer, id, subjectPrefix) |
|
|
|
|
log.Println("NATS enabled", useNats, id) |
|
|
|
|
b, err = newNatsBus(apiConsumer, id, triggerSubjectPrefix) |
|
|
|
|
if err == nil { |
|
|
|
|
log.Println("NATS bus connected") |
|
|
|
|
} else { |
|
|
|
|
@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim
@@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *noopBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { |
|
|
|
|
v.Id = bus.id |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *noopBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { |
|
|
|
|
v.Id = bus.id |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink {
@@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink {
|
|
|
|
|
|
|
|
|
|
type natsBus struct { |
|
|
|
|
ChannellingAPIConsumer |
|
|
|
|
id string |
|
|
|
|
prefix string |
|
|
|
|
ec *natsconnection.EncodedConnection |
|
|
|
|
triggerQueue chan *busQueueEntry |
|
|
|
|
id string |
|
|
|
|
triggerPrefix string |
|
|
|
|
ec *natsconnection.EncodedConnection |
|
|
|
|
triggerQueue chan *busQueueEntry |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) { |
|
|
|
|
func newNatsBus(apiConsumer ChannellingAPIConsumer, id, triggerPrefix string) (*natsBus, error) { |
|
|
|
|
ec, err := natsconnection.EstablishJSONEncodedConnection(nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
if prefix == "" { |
|
|
|
|
prefix = "channelling.trigger" |
|
|
|
|
} |
|
|
|
|
// Create buffered channel for outbound NATS data.
|
|
|
|
|
triggerQueue := make(chan *busQueueEntry, 50) |
|
|
|
|
|
|
|
|
|
return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil |
|
|
|
|
return &natsBus{apiConsumer, id, triggerPrefix, ec, triggerQueue}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) Start() { |
|
|
|
|
@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim
@@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim
|
|
|
|
|
return bus.ec.Request(subject, v, vPtr, timeout) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) BusRequest(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration) error { |
|
|
|
|
v.Id = bus.id |
|
|
|
|
return bus.Request(subject, v, vPtr, timeout) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) BusRequestWithRetry(subject string, v *BusRequest, vPtr interface{}, timeout time.Duration, singleTimeout *time.Duration) error { |
|
|
|
|
if singleTimeout == nil { |
|
|
|
|
singleTimeout = &natsconnection.DefaultRequestTimeout |
|
|
|
|
} |
|
|
|
|
v.Id = bus.id |
|
|
|
|
err := natsconnection.CallFuncWithRetry(func() error { |
|
|
|
|
return bus.Request(subject, v, vPtr, *singleTimeout) |
|
|
|
|
}, timeout) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) { |
|
|
|
|
if bus.triggerPrefix == "" { |
|
|
|
|
// Trigger disabled.
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
trigger := &BusTrigger{ |
|
|
|
|
Id: bus.id, |
|
|
|
|
Name: name, |
|
|
|
|
@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
@@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
|
|
|
|
|
if pipeline != nil { |
|
|
|
|
trigger.Pipeline = pipeline.GetID() |
|
|
|
|
} |
|
|
|
|
entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} |
|
|
|
|
entry := &busQueueEntry{BusSubjectTrigger(bus.triggerPrefix, name), trigger} |
|
|
|
|
select { |
|
|
|
|
case bus.triggerQueue <- entry: |
|
|
|
|
// sent ok
|
|
|
|
|
@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
@@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) PrefixSubject(sub string) string { |
|
|
|
|
return fmt.Sprintf("%s.%s", bus.prefix, sub) |
|
|
|
|
return fmt.Sprintf("%s.%s", bus.triggerPrefix, sub) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { |
|
|
|
|
|