You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
306 lines
7.7 KiB
306 lines
7.7 KiB
/* |
|
* Spreed WebRTC. |
|
* Copyright (C) 2013-2015 struktur AG |
|
* |
|
* This file is part of Spreed WebRTC. |
|
* |
|
* This program is free software: you can redistribute it and/or modify |
|
* it under the terms of the GNU Affero General Public License as published by |
|
* the Free Software Foundation, either version 3 of the License, or |
|
* (at your option) any later version. |
|
* |
|
* This program is distributed in the hope that it will be useful, |
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
* GNU Affero General Public License for more details. |
|
* |
|
* You should have received a copy of the GNU Affero General Public License |
|
* along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
* |
|
*/ |
|
|
|
package channelling |
|
|
|
import ( |
|
"errors" |
|
"fmt" |
|
"log" |
|
"sync" |
|
"time" |
|
|
|
"github.com/nats-io/nats" |
|
|
|
"github.com/strukturag/spreed-webrtc/go/natsconnection" |
|
) |
|
|
|
const ( |
|
BusManagerStartup = "startup" |
|
BusManagerOffer = "offer" |
|
BusManagerAnswer = "answer" |
|
BusManagerBye = "bye" |
|
BusManagerConnect = "connect" |
|
BusManagerDisconnect = "disconnect" |
|
BusManagerSession = "session" |
|
) |
|
|
|
// A BusManager provides the API to interact with a bus. |
|
type BusManager interface { |
|
ChannellingAPIConsumer |
|
Start() |
|
Publish(subject string, v interface{}) error |
|
Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error |
|
Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error |
|
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) |
|
BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) |
|
BindSendChan(subject string, channel interface{}) error |
|
PrefixSubject(string) string |
|
CreateSink(string) Sink |
|
} |
|
|
|
// A BusTrigger is a container to serialize trigger events |
|
// for the bus backend. |
|
type BusTrigger struct { |
|
Id string |
|
Name string |
|
From string |
|
Payload string `json:",omitempty"` |
|
Data interface{} `json:",omitempty"` |
|
Pipeline string `json:",omitempty"` |
|
} |
|
|
|
// BusSubjectTrigger returns the bus subject for trigger payloads. |
|
func BusSubjectTrigger(prefix, suffix string) string { |
|
return fmt.Sprintf("%s.%s", prefix, suffix) |
|
} |
|
|
|
// NewBusManager creates and initializes a new BusMager with the |
|
// provided flags for NATS support. It is intended to connect the |
|
// backend bus with a easy to use API to send and receive bus data. |
|
func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager { |
|
var b BusManager |
|
var err error |
|
if useNats { |
|
b, err = newNatsBus(apiConsumer, id, subjectPrefix) |
|
if err == nil { |
|
log.Println("NATS bus connected") |
|
} else { |
|
log.Println("Error connecting NATS bus", err) |
|
b = &noopBus{apiConsumer, id} |
|
} |
|
} else { |
|
b = &noopBus{apiConsumer, id} |
|
} |
|
|
|
return b |
|
} |
|
|
|
type noopBus struct { |
|
ChannellingAPIConsumer |
|
id string |
|
} |
|
|
|
func (bus *noopBus) Start() { |
|
// noop |
|
} |
|
|
|
func (bus *noopBus) Publish(subject string, v interface{}) error { |
|
return nil |
|
} |
|
|
|
func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error { |
|
return nil |
|
} |
|
|
|
func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error { |
|
return nil |
|
} |
|
|
|
func (bus *noopBus) PrefixSubject(subject string) string { |
|
return subject |
|
} |
|
|
|
func (bus *noopBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) { |
|
return nil, nil |
|
} |
|
|
|
func (bus *noopBus) BindSendChan(subject string, channel interface{}) error { |
|
return nil |
|
} |
|
|
|
func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { |
|
return nil, nil |
|
} |
|
|
|
func (bus *noopBus) CreateSink(id string) Sink { |
|
return nil |
|
} |
|
|
|
type natsBus struct { |
|
ChannellingAPIConsumer |
|
id string |
|
prefix string |
|
ec *natsconnection.EncodedConnection |
|
triggerQueue chan *busQueueEntry |
|
} |
|
|
|
func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) { |
|
ec, err := natsconnection.EstablishJSONEncodedConnection(nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if prefix == "" { |
|
prefix = "channelling.trigger" |
|
} |
|
// Create buffered channel for outbound NATS data. |
|
triggerQueue := make(chan *busQueueEntry, 50) |
|
|
|
return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil |
|
} |
|
|
|
func (bus *natsBus) Start() { |
|
// Start go routine to process outbount NATS publishing. |
|
go chPublish(bus.ec, bus.triggerQueue) |
|
bus.Trigger(BusManagerStartup, bus.id, "", nil, nil) |
|
} |
|
|
|
func (bus *natsBus) Publish(subject string, v interface{}) error { |
|
return bus.ec.Publish(subject, v) |
|
} |
|
|
|
func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error { |
|
return bus.ec.Request(subject, v, vPtr, timeout) |
|
} |
|
|
|
func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) { |
|
trigger := &BusTrigger{ |
|
Id: bus.id, |
|
Name: name, |
|
From: from, |
|
Payload: payload, |
|
Data: data, |
|
} |
|
if pipeline != nil { |
|
trigger.Pipeline = pipeline.GetID() |
|
} |
|
entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} |
|
select { |
|
case bus.triggerQueue <- entry: |
|
// sent ok |
|
default: |
|
log.Println("Failed to queue NATS event - queue full?") |
|
err = errors.New("NATS trigger queue full") |
|
} |
|
|
|
return err |
|
} |
|
|
|
func (bus *natsBus) PrefixSubject(sub string) string { |
|
return fmt.Sprintf("%s.%s", bus.prefix, sub) |
|
} |
|
|
|
func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { |
|
return bus.ec.Subscribe(subject, cb) |
|
} |
|
|
|
func (bus *natsBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) { |
|
return bus.ec.BindRecvChan(subject, channel) |
|
} |
|
|
|
func (bus *natsBus) BindSendChan(subject string, channel interface{}) error { |
|
return bus.ec.BindSendChan(subject, channel) |
|
} |
|
|
|
func (bus *natsBus) CreateSink(id string) (sink Sink) { |
|
sink = newNatsSink(bus, id) |
|
return |
|
} |
|
|
|
type busQueueEntry struct { |
|
subject string |
|
data interface{} |
|
} |
|
|
|
func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntry)) { |
|
for { |
|
entry := <-channel |
|
err := ec.Publish(entry.subject, entry.data) |
|
if err != nil { |
|
log.Println("Failed to publish to NATS", entry.subject, err) |
|
} |
|
} |
|
} |
|
|
|
type natsSink struct { |
|
sync.RWMutex |
|
id string |
|
bm BusManager |
|
closed bool |
|
SubjectOut string |
|
SubjectIn string |
|
sub *nats.Subscription |
|
sendQueue chan *DataSinkOutgoing |
|
} |
|
|
|
func newNatsSink(bm BusManager, id string) *natsSink { |
|
sink := &natsSink{ |
|
id: id, |
|
bm: bm, |
|
SubjectOut: bm.PrefixSubject(fmt.Sprintf("sink.%s.out", id)), |
|
SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)), |
|
} |
|
|
|
sink.sendQueue = make(chan *DataSinkOutgoing, 100) |
|
bm.BindSendChan(sink.SubjectOut, sink.sendQueue) |
|
|
|
return sink |
|
} |
|
|
|
func (sink *natsSink) Write(outgoing *DataSinkOutgoing) (err error) { |
|
if sink.Enabled() { |
|
log.Println("Sending via NATS sink", sink.SubjectOut, outgoing) |
|
sink.sendQueue <- outgoing |
|
} |
|
return err |
|
} |
|
|
|
func (sink *natsSink) Enabled() bool { |
|
sink.RLock() |
|
defer sink.RUnlock() |
|
return sink.closed == false |
|
} |
|
|
|
func (sink *natsSink) Close() { |
|
sink.Lock() |
|
defer sink.Unlock() |
|
if sink.sub != nil { |
|
err := sink.sub.Unsubscribe() |
|
if err != nil { |
|
log.Println("Failed to unsubscribe NATS sink", err) |
|
} else { |
|
sink.sub = nil |
|
} |
|
} |
|
sink.closed = true |
|
} |
|
|
|
func (sink *natsSink) Export() *DataSink { |
|
return &DataSink{ |
|
SubjectOut: sink.SubjectOut, |
|
SubjectIn: sink.SubjectIn, |
|
} |
|
} |
|
|
|
func (sink *natsSink) BindRecvChan(channel interface{}) (*nats.Subscription, error) { |
|
sink.Lock() |
|
defer sink.Unlock() |
|
if sink.sub != nil { |
|
sink.sub.Unsubscribe() |
|
sink.sub = nil |
|
} |
|
sub, err := sink.bm.BindRecvChan(sink.SubjectIn, channel) |
|
if err != nil { |
|
return nil, err |
|
} |
|
sink.sub = sub |
|
return sub, nil |
|
}
|
|
|