Browse Source

Added support for NATS pub/sub messaging to trigger channeling events for external services.

pull/265/head
Simon Eisenmann 9 years ago
parent
commit
e33796f83f
  1. 1
      Godeps
  2. 9
      server.conf.in
  3. 109
      src/app/spreed-webrtc-server/bus_manager.go
  4. 15
      src/app/spreed-webrtc-server/channelling_api.go
  5. 17
      src/app/spreed-webrtc-server/main.go
  6. 65
      src/app/spreed-webrtc-server/nats.go

1
Godeps

@ -9,3 +9,4 @@ github.com/strukturag/httputils httputils_v012 @@ -9,3 +9,4 @@ github.com/strukturag/httputils httputils_v012
github.com/strukturag/phoenix phoenix_v0133
github.com/strukturag/sloth v0.9.2
github.com/dlintw/goconf dcc070983490608a14480e3bf943bad464785df5
github.com/nats-io/nats v1.1.6

9
server.conf.in

@ -194,3 +194,12 @@ enabled = false @@ -194,3 +194,12 @@ enabled = false
; enable userid creation/registration. Users are created according the settings
; of the currently configured mode (see above).
;allowRegistration = false
[nats]
; Set to true, to enable triggering channelling events via NATS
;channelling_trigger = false
;channelling_trigger_subject = channelling.trigger
; NATS server URL
;url = nats://127.0.0.1:4222
; NATS connect establish timeout in seconds
;establishTimeout = 60

109
src/app/spreed-webrtc-server/bus_manager.go

@ -0,0 +1,109 @@ @@ -0,0 +1,109 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"fmt"
"log"
"github.com/nats-io/nats"
)
const (
BusManagerOffer = "offer"
BusManagerAnswer = "answer"
BusManagerBye = "bye"
BusManagerConnect = "connect"
BusManagerDisconnect = "disconnect"
BusManagerAuth = "auth"
)
type BusManager interface {
Trigger(name, from, payload string, data interface{}) error
}
type BusEvent struct {
Name string
From string
Payload string `json:",omitempty"`
Data interface{} `json:",omitempty"`
}
type busManager struct {
BusManager
}
func NewBusManager(useNats bool, subjectPrefix string) BusManager {
var b BusManager
if useNats {
var err error
b, err = newNatsBus(subjectPrefix)
if err == nil {
log.Println("Nats bus connected")
} else {
log.Println("Error connecting nats bus", err)
b = &noopBus{}
}
} else {
b = &noopBus{}
}
return &busManager{b}
}
type noopBus struct {
}
func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error {
return nil
}
type natsBus struct {
prefix string
ec *nats.EncodedConn
}
func newNatsBus(prefix string) (*natsBus, error) {
ec, err := EstablishNatsConnection(nil)
if err != nil {
return nil, err
}
if prefix == "" {
prefix = "channelling.trigger"
}
return &natsBus{prefix, ec}, nil
}
func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) {
if bus.ec != nil {
event := &BusEvent{
Name: name,
From: from,
Payload: payload,
Data: data,
}
err = bus.ec.Publish(fmt.Sprintf("%s.%s", bus.prefix, name), event)
if err != nil {
log.Println("Failed to trigger NATS event", err)
}
}
return err
}

15
src/app/spreed-webrtc-server/channelling_api.go

@ -46,9 +46,10 @@ type channellingAPI struct { @@ -46,9 +46,10 @@ type channellingAPI struct {
ContactManager
TurnDataCreator
Unicaster
BusManager
}
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI {
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, busManager BusManager) ChannellingAPI {
return &channellingAPI{
config,
roomStatus,
@ -58,16 +59,22 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco @@ -58,16 +59,22 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco
contactManager,
turnDataCreator,
unicaster,
busManager,
}
}
func (api *channellingAPI) OnConnect(client Client, session *Session) (interface{}, error) {
api.Unicaster.OnConnect(client, session)
return api.HandleSelf(session)
self, err := api.HandleSelf(session)
if err == nil {
api.Trigger(BusManagerConnect, session.Id, "", nil)
}
return self, err
}
func (api *channellingAPI) OnDisconnect(client Client, session *Session) {
api.Unicaster.OnDisconnect(client, session)
api.Trigger(BusManagerDisconnect, session.Id, "", nil)
}
func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *DataIncoming) (interface{}, error) {
@ -85,6 +92,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data @@ -85,6 +92,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data
log.Println("Received invalid offer message.", msg)
break
}
api.Trigger(BusManagerOffer, session.Id, msg.Offer.To, nil)
// TODO(longsleep): Validate offer
session.Unicast(msg.Offer.To, msg.Offer)
@ -101,6 +109,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data @@ -101,6 +109,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data
log.Println("Received invalid answer message.", msg)
break
}
api.Trigger(BusManagerAnswer, session.Id, msg.Answer.To, nil)
// TODO(longsleep): Validate Answer
session.Unicast(msg.Answer.To, msg.Answer)
@ -117,6 +126,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data @@ -117,6 +126,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data
log.Println("Received invalid bye message.", msg)
break
}
api.Trigger(BusManagerBye, session.Id, msg.Bye.To, nil)
session.Unicast(msg.Bye.To, msg.Bye)
case "Status":
@ -226,6 +236,7 @@ func (api *channellingAPI) HandleAuthentication(session *Session, st *SessionTok @@ -226,6 +236,7 @@ func (api *channellingAPI) HandleAuthentication(session *Session, st *SessionTok
log.Println("Authentication success", session.Userid())
self, err := api.HandleSelf(session)
if err == nil {
api.Trigger("Auth", session.Id, session.Userid(), nil)
session.BroadcastStatus()
}

17
src/app/spreed-webrtc-server/main.go

@ -306,6 +306,20 @@ func runner(runtime phoenix.Runtime) error { @@ -306,6 +306,20 @@ func runner(runtime phoenix.Runtime) error {
tokenProvider = TokenFileProvider(tokenFile)
}
// Nats pub/sub supports.
natsChannellingTrigger, _ := runtime.GetBool("nats", "channelling_trigger")
natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject")
if natsURL, err := runtime.GetString("nats", "url"); err == nil {
if natsURL != "" {
DefaultNatsURL = natsURL
}
}
if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil {
if natsEstablishTimeout != 0 {
DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second
}
}
// Load remaining configuration items.
config = NewConfig(runtime, tokenProvider != nil)
@ -407,7 +421,8 @@ func runner(runtime phoenix.Runtime) error { @@ -407,7 +421,8 @@ func runner(runtime phoenix.Runtime) error {
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub)
busManager := NewBusManager(natsChannellingTrigger, natsChannellingTriggerSubject)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager)
// Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))

65
src/app/spreed-webrtc-server/nats.go

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
package main
import (
"errors"
"log"
"time"
"github.com/nats-io/nats"
)
var DefaultNatsEstablishTimeout = 60 * time.Second
var DefaultNatsURL = nats.DefaultURL
func NewNatsConnection() (*nats.EncodedConn, error) {
nc, err := nats.Connect(DefaultNatsURL)
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return ec, nil
}
func EstablishNatsConnection(timeout *time.Duration) (*nats.EncodedConn, error) {
if timeout == nil {
timeout = &DefaultNatsEstablishTimeout
}
connch := make(chan *nats.EncodedConn, 1)
errch := make(chan error, 1)
go func() {
notify := true
for {
ec, err := NewNatsConnection()
if err == nil {
connch <- ec
break
}
switch err {
case nats.ErrTimeout:
fallthrough
case nats.ErrNoServers:
if notify {
notify = false
log.Println("Waiting for NATS server to become available")
}
time.Sleep(1 * time.Second)
continue
default:
errch <- err
break
}
}
}()
select {
case conn := <-connch:
return conn, nil
case err := <-errch:
return nil, err
case <-time.After(*timeout):
return nil, errors.New("NATS connection: timeout")
}
}
Loading…
Cancel
Save