Browse Source

Merge pull request #265 from longsleep/nats-triggers

Added support for NATS pub/sub messaging to trigger channeling events…
pull/266/head
Simon Eisenmann 9 years ago
parent
commit
1a9a2b406e
  1. 1
      Godeps
  2. 9
      server.conf.in
  3. 120
      src/app/spreed-webrtc-server/bus_manager.go
  4. 4
      src/app/spreed-webrtc-server/channelling.go
  5. 35
      src/app/spreed-webrtc-server/channelling_api.go
  6. 17
      src/app/spreed-webrtc-server/main.go
  7. 77
      src/app/spreed-webrtc-server/nats.go

1
Godeps

@ -9,3 +9,4 @@ github.com/strukturag/httputils httputils_v012 @@ -9,3 +9,4 @@ github.com/strukturag/httputils httputils_v012
github.com/strukturag/phoenix phoenix_v0133
github.com/strukturag/sloth v0.9.2
github.com/dlintw/goconf dcc070983490608a14480e3bf943bad464785df5
github.com/nats-io/nats v1.1.6

9
server.conf.in

@ -194,3 +194,12 @@ enabled = false @@ -194,3 +194,12 @@ enabled = false
; enable userid creation/registration. Users are created according the settings
; of the currently configured mode (see above).
;allowRegistration = false
[nats]
; Set to true, to enable triggering channelling events via NATS
;channelling_trigger = false
;channelling_trigger_subject = channelling.trigger
; NATS server URL
;url = nats://127.0.0.1:4222
; NATS connect establish timeout in seconds
;establishTimeout = 60

120
src/app/spreed-webrtc-server/bus_manager.go

@ -0,0 +1,120 @@ @@ -0,0 +1,120 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"fmt"
"log"
"github.com/nats-io/nats"
)
const (
BusManagerOffer = "offer"
BusManagerAnswer = "answer"
BusManagerBye = "bye"
BusManagerConnect = "connect"
BusManagerDisconnect = "disconnect"
BusManagerAuth = "auth"
)
// A BusManager provides the API to interact with a bus.
type BusManager interface {
Trigger(name, from, payload string, data interface{}) error
}
// A BusTrigger is a container to serialize trigger events
// for the bus backend.
type BusTrigger struct {
Name string
From string
Payload string `json:",omitempty"`
Data interface{} `json:",omitempty"`
}
// BusSubjectTrigger returns the bus subject for trigger payloads.
func BusSubjectTrigger(prefix, suffix string) string {
return fmt.Sprintf("%s.%s", prefix, suffix)
}
type busManager struct {
BusManager
}
// NewBusManager creates and initializes a new BusMager with the
// provided flags for NATS support. It is intended to connect the
// backend bus with a easy to use API to send and receive bus data.
func NewBusManager(useNats bool, subjectPrefix string) BusManager {
var b BusManager
if useNats {
var err error
b, err = newNatsBus(subjectPrefix)
if err == nil {
log.Println("Nats bus connected")
} else {
log.Println("Error connecting nats bus", err)
b = &noopBus{}
}
} else {
b = &noopBus{}
}
return &busManager{b}
}
type noopBus struct {
}
func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error {
return nil
}
type natsBus struct {
prefix string
ec *nats.EncodedConn
}
func newNatsBus(prefix string) (*natsBus, error) {
ec, err := EstablishNatsConnection(nil)
if err != nil {
return nil, err
}
if prefix == "" {
prefix = "channelling.trigger"
}
return &natsBus{prefix, ec}, nil
}
func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) {
if bus.ec != nil {
trigger := &BusTrigger{
Name: name,
From: from,
Payload: payload,
Data: data,
}
err = bus.ec.Publish(BusSubjectTrigger(bus.prefix, name), trigger)
if err != nil {
log.Println("Failed to trigger NATS event", err)
}
}
return err
}

4
src/app/spreed-webrtc-server/channelling.go

@ -63,7 +63,7 @@ type DataRoom struct { @@ -63,7 +63,7 @@ type DataRoom struct {
type DataOffer struct {
Type string
To string
Offer interface{}
Offer map[string]interface{}
}
type DataCandidate struct {
@ -75,7 +75,7 @@ type DataCandidate struct { @@ -75,7 +75,7 @@ type DataCandidate struct {
type DataAnswer struct {
Type string
To string
Answer interface{}
Answer map[string]interface{}
}
type DataSelf struct {

35
src/app/spreed-webrtc-server/channelling_api.go

@ -46,9 +46,13 @@ type channellingAPI struct { @@ -46,9 +46,13 @@ type channellingAPI struct {
ContactManager
TurnDataCreator
Unicaster
BusManager
}
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI {
// NewChannellingAPI creates and initializes a new ChannellingAPI using
// various other services for initialization. It is intended to handle
// incoming and outgoing channeling API events from clients.
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, busManager BusManager) ChannellingAPI {
return &channellingAPI{
config,
roomStatus,
@ -58,16 +62,22 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco @@ -58,16 +62,22 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco
contactManager,
turnDataCreator,
unicaster,
busManager,
}
}
func (api *channellingAPI) OnConnect(client Client, session *Session) (interface{}, error) {
api.Unicaster.OnConnect(client, session)
return api.HandleSelf(session)
self, err := api.HandleSelf(session)
if err == nil {
api.Trigger(BusManagerConnect, session.Id, "", nil)
}
return self, err
}
func (api *channellingAPI) OnDisconnect(client Client, session *Session) {
api.Unicaster.OnDisconnect(client, session)
api.Trigger(BusManagerDisconnect, session.Id, "", nil)
}
func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *DataIncoming) (interface{}, error) {
@ -81,28 +91,35 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data @@ -81,28 +91,35 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data
return api.HandleHello(session, msg.Hello, sender)
case "Offer":
if msg.Offer == nil {
if msg.Offer == nil || msg.Offer.Offer == nil {
log.Println("Received invalid offer message.", msg)
break
}
if _, ok := msg.Offer.Offer["_token"]; !ok {
// Trigger offer event when offer has no token, so this is
// not triggered for peerxfer and peerscreenshare offers.
api.Trigger(BusManagerOffer, session.Id, msg.Offer.To, nil)
}
// TODO(longsleep): Validate offer
session.Unicast(msg.Offer.To, msg.Offer)
case "Candidate":
if msg.Candidate == nil {
if msg.Candidate == nil || msg.Candidate.Candidate == nil {
log.Println("Received invalid candidate message.", msg)
break
}
// TODO(longsleep): Validate candidate
session.Unicast(msg.Candidate.To, msg.Candidate)
case "Answer":
if msg.Answer == nil {
if msg.Answer == nil || msg.Answer.Answer == nil {
log.Println("Received invalid answer message.", msg)
break
}
if _, ok := msg.Answer.Answer["_token"]; !ok {
// Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers.
api.Trigger(BusManagerAnswer, session.Id, msg.Answer.To, nil)
}
// TODO(longsleep): Validate Answer
session.Unicast(msg.Answer.To, msg.Answer)
case "Users":
return api.HandleUsers(session)
@ -117,6 +134,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data @@ -117,6 +134,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data
log.Println("Received invalid bye message.", msg)
break
}
api.Trigger(BusManagerBye, session.Id, msg.Bye.To, nil)
session.Unicast(msg.Bye.To, msg.Bye)
case "Status":
@ -226,6 +244,7 @@ func (api *channellingAPI) HandleAuthentication(session *Session, st *SessionTok @@ -226,6 +244,7 @@ func (api *channellingAPI) HandleAuthentication(session *Session, st *SessionTok
log.Println("Authentication success", session.Userid())
self, err := api.HandleSelf(session)
if err == nil {
api.Trigger("Auth", session.Id, session.Userid(), nil)
session.BroadcastStatus()
}

17
src/app/spreed-webrtc-server/main.go

@ -306,6 +306,20 @@ func runner(runtime phoenix.Runtime) error { @@ -306,6 +306,20 @@ func runner(runtime phoenix.Runtime) error {
tokenProvider = TokenFileProvider(tokenFile)
}
// Nats pub/sub supports.
natsChannellingTrigger, _ := runtime.GetBool("nats", "channelling_trigger")
natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject")
if natsURL, err := runtime.GetString("nats", "url"); err == nil {
if natsURL != "" {
DefaultNatsURL = natsURL
}
}
if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil {
if natsEstablishTimeout != 0 {
DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second
}
}
// Load remaining configuration items.
config = NewConfig(runtime, tokenProvider != nil)
@ -407,7 +421,8 @@ func runner(runtime phoenix.Runtime) error { @@ -407,7 +421,8 @@ func runner(runtime phoenix.Runtime) error {
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub)
busManager := NewBusManager(natsChannellingTrigger, natsChannellingTriggerSubject)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager)
// Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))

77
src/app/spreed-webrtc-server/nats.go

@ -0,0 +1,77 @@ @@ -0,0 +1,77 @@
package main
import (
"errors"
"log"
"time"
"github.com/nats-io/nats"
)
// DefaultNatsEstablishTimeout is the default timeout for
// calls to EstablishNatsConnection.
var DefaultNatsEstablishTimeout = 60 * time.Second
// DefaultNatsURL is the default NATS server URL used for
// calls to NewNatsConnection and EstablishNatsConnection.
var DefaultNatsURL = nats.DefaultURL
// NewNatsConnection creates a connetion to the default NATS server
// and tries to establish the connection. It returns the connection
// and any connection error encountered.
func NewNatsConnection() (*nats.EncodedConn, error) {
nc, err := nats.Connect(DefaultNatsURL)
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return ec, nil
}
// EstablishNatsConnection is a blocking way to create and establish
// connection to the default NATS server. The function will only return
// after a timeout has reached or a connection has been established. It
// returns the connection and and any timeout error encountered.
func EstablishNatsConnection(timeout *time.Duration) (*nats.EncodedConn, error) {
if timeout == nil {
timeout = &DefaultNatsEstablishTimeout
}
connch := make(chan *nats.EncodedConn, 1)
errch := make(chan error, 1)
go func() {
notify := true
for {
ec, err := NewNatsConnection()
if err == nil {
connch <- ec
break
}
switch err {
case nats.ErrTimeout:
fallthrough
case nats.ErrNoServers:
if notify {
notify = false
log.Println("Waiting for NATS server to become available")
}
time.Sleep(1 * time.Second)
continue
default:
errch <- err
break
}
}
}()
select {
case conn := <-connch:
return conn, nil
case err := <-errch:
return nil, err
case <-time.After(*timeout):
return nil, errors.New("NATS connection: timeout")
}
}
Loading…
Cancel
Save