@ -49,6 +49,8 @@ type BusManager interface {
@@ -49,6 +49,8 @@ type BusManager interface {
Start ( )
Publish ( subject string , v interface { } ) error
Request ( subject string , v interface { } , vPtr interface { } , timeout time . Duration ) error
BusRequest ( subject string , v * BusRequest , vPtr interface { } , timeout time . Duration ) error
BusRequestWithRetry ( subject string , v * BusRequest , vPtr interface { } , timeout time . Duration , singleTimeout * time . Duration ) error
Trigger ( name , from , payload string , data interface { } , pipeline * Pipeline ) error
Subscribe ( subject string , cb nats . Handler ) ( * nats . Subscription , error )
BindRecvChan ( subject string , channel interface { } ) ( * nats . Subscription , error )
@ -68,6 +70,14 @@ type BusTrigger struct {
@@ -68,6 +70,14 @@ type BusTrigger struct {
Pipeline string ` json:",omitempty" `
}
// A BusRequest is a simple generic to allow sending arbitrary
// Requests to the bus.
type BusRequest struct {
Id string
Type string
Data interface { } ` json:",omitempty" `
}
// BusSubjectTrigger returns the bus subject for trigger payloads.
func BusSubjectTrigger ( prefix , suffix string ) string {
return fmt . Sprintf ( "%s.%s" , prefix , suffix )
@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string {
@@ -76,11 +86,12 @@ func BusSubjectTrigger(prefix, suffix string) string {
// NewBusManager creates and initializes a new BusMager with the
// provided flags for NATS support. It is intended to connect the
// backend bus with a easy to use API to send and receive bus data.
func NewBusManager ( apiConsumer ChannellingAPIConsumer , id string , useNats bool , s ubjectPrefix string ) BusManager {
func NewBusManager ( apiConsumer ChannellingAPIConsumer , id string , useNats bool , triggerS ubjectPrefix string ) BusManager {
var b BusManager
var err error
if useNats {
b , err = newNatsBus ( apiConsumer , id , subjectPrefix )
log . Println ( "NATS enabled" , useNats , id )
b , err = newNatsBus ( apiConsumer , id , triggerSubjectPrefix )
if err == nil {
log . Println ( "NATS bus connected" )
} else {
@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim
@@ -111,6 +122,16 @@ func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, tim
return nil
}
func ( bus * noopBus ) BusRequest ( subject string , v * BusRequest , vPtr interface { } , timeout time . Duration ) error {
v . Id = bus . id
return nil
}
func ( bus * noopBus ) BusRequestWithRetry ( subject string , v * BusRequest , vPtr interface { } , timeout time . Duration , singleTimeout * time . Duration ) error {
v . Id = bus . id
return nil
}
func ( bus * noopBus ) Trigger ( name , from , payload string , data interface { } , pipeline * Pipeline ) error {
return nil
}
@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink {
@@ -137,24 +158,21 @@ func (bus *noopBus) CreateSink(id string) Sink {
type natsBus struct {
ChannellingAPIConsumer
id string
prefix string
ec * natsconnection . EncodedConnection
triggerQueue chan * busQueueEntry
id string
triggerPrefix string
ec * natsconnection . EncodedConnection
triggerQueue chan * busQueueEntry
}
func newNatsBus ( apiConsumer ChannellingAPIConsumer , id , p refix string ) ( * natsBus , error ) {
func newNatsBus ( apiConsumer ChannellingAPIConsumer , id , triggerP refix string ) ( * natsBus , error ) {
ec , err := natsconnection . EstablishJSONEncodedConnection ( nil )
if err != nil {
return nil , err
}
if prefix == "" {
prefix = "channelling.trigger"
}
// Create buffered channel for outbound NATS data.
triggerQueue := make ( chan * busQueueEntry , 50 )
return & natsBus { apiConsumer , id , p refix, ec , triggerQueue } , nil
return & natsBus { apiConsumer , id , triggerP refix, ec , triggerQueue } , nil
}
func ( bus * natsBus ) Start ( ) {
@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim
@@ -171,7 +189,28 @@ func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, tim
return bus . ec . Request ( subject , v , vPtr , timeout )
}
func ( bus * natsBus ) BusRequest ( subject string , v * BusRequest , vPtr interface { } , timeout time . Duration ) error {
v . Id = bus . id
return bus . Request ( subject , v , vPtr , timeout )
}
func ( bus * natsBus ) BusRequestWithRetry ( subject string , v * BusRequest , vPtr interface { } , timeout time . Duration , singleTimeout * time . Duration ) error {
if singleTimeout == nil {
singleTimeout = & natsconnection . DefaultRequestTimeout
}
v . Id = bus . id
err := natsconnection . CallFuncWithRetry ( func ( ) error {
return bus . Request ( subject , v , vPtr , * singleTimeout )
} , timeout )
return err
}
func ( bus * natsBus ) Trigger ( name , from , payload string , data interface { } , pipeline * Pipeline ) ( err error ) {
if bus . triggerPrefix == "" {
// Trigger disabled.
return nil
}
trigger := & BusTrigger {
Id : bus . id ,
Name : name ,
@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
@@ -182,7 +221,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
if pipeline != nil {
trigger . Pipeline = pipeline . GetID ( )
}
entry := & busQueueEntry { BusSubjectTrigger ( bus . p refix, name ) , trigger }
entry := & busQueueEntry { BusSubjectTrigger ( bus . triggerP refix, name ) , trigger }
select {
case bus . triggerQueue <- entry :
// sent ok
@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
@@ -195,7 +234,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeli
}
func ( bus * natsBus ) PrefixSubject ( sub string ) string {
return fmt . Sprintf ( "%s.%s" , bus . p refix, sub )
return fmt . Sprintf ( "%s.%s" , bus . triggerP refix, sub )
}
func ( bus * natsBus ) Subscribe ( subject string , cb nats . Handler ) ( * nats . Subscription , error ) {