diff --git a/Godeps b/Godeps index 462a3358..ee5b22fe 100644 --- a/Godeps +++ b/Godeps @@ -9,3 +9,4 @@ github.com/strukturag/httputils httputils_v012 github.com/strukturag/phoenix phoenix_v0133 github.com/strukturag/sloth v0.9.2 github.com/dlintw/goconf dcc070983490608a14480e3bf943bad464785df5 +github.com/nats-io/nats v1.1.6 diff --git a/server.conf.in b/server.conf.in index b57031a7..b9727df6 100644 --- a/server.conf.in +++ b/server.conf.in @@ -194,3 +194,12 @@ enabled = false ; enable userid creation/registration. Users are created according the settings ; of the currently configured mode (see above). ;allowRegistration = false + +[nats] +; Set to true, to enable triggering channelling events via NATS +;channelling_trigger = false +;channelling_trigger_subject = channelling.trigger +; NATS server URL +;url = nats://127.0.0.1:4222 +; NATS connect establish timeout in seconds +;establishTimeout = 60 \ No newline at end of file diff --git a/src/app/spreed-webrtc-server/bus_manager.go b/src/app/spreed-webrtc-server/bus_manager.go new file mode 100644 index 00000000..e302bd72 --- /dev/null +++ b/src/app/spreed-webrtc-server/bus_manager.go @@ -0,0 +1,109 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package main + +import ( + "fmt" + "log" + + "github.com/nats-io/nats" +) + +const ( + BusManagerOffer = "offer" + BusManagerAnswer = "answer" + BusManagerBye = "bye" + BusManagerConnect = "connect" + BusManagerDisconnect = "disconnect" + BusManagerAuth = "auth" +) + +type BusManager interface { + Trigger(name, from, payload string, data interface{}) error +} + +type BusEvent struct { + Name string + From string + Payload string `json:",omitempty"` + Data interface{} `json:",omitempty"` +} + +type busManager struct { + BusManager +} + +func NewBusManager(useNats bool, subjectPrefix string) BusManager { + var b BusManager + if useNats { + var err error + b, err = newNatsBus(subjectPrefix) + if err == nil { + log.Println("Nats bus connected") + } else { + log.Println("Error connecting nats bus", err) + b = &noopBus{} + } + } else { + b = &noopBus{} + } + return &busManager{b} +} + +type noopBus struct { +} + +func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error { + return nil +} + +type natsBus struct { + prefix string + ec *nats.EncodedConn +} + +func newNatsBus(prefix string) (*natsBus, error) { + ec, err := EstablishNatsConnection(nil) + if err != nil { + return nil, err + } + if prefix == "" { + prefix = "channelling.trigger" + } + return &natsBus{prefix, ec}, nil +} + +func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { + if bus.ec != nil { + event := &BusEvent{ + Name: name, + From: from, + Payload: payload, + Data: data, + } + err = bus.ec.Publish(fmt.Sprintf("%s.%s", bus.prefix, name), event) + if err != nil { + log.Println("Failed to trigger NATS event", err) + } + } + return err +} diff --git a/src/app/spreed-webrtc-server/channelling_api.go b/src/app/spreed-webrtc-server/channelling_api.go index f44e4671..7820a87c 100644 --- a/src/app/spreed-webrtc-server/channelling_api.go +++ b/src/app/spreed-webrtc-server/channelling_api.go @@ -46,9 +46,10 @@ type channellingAPI struct { ContactManager TurnDataCreator Unicaster + BusManager } -func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI { +func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, busManager BusManager) ChannellingAPI { return &channellingAPI{ config, roomStatus, @@ -58,16 +59,22 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco contactManager, turnDataCreator, unicaster, + busManager, } } func (api *channellingAPI) OnConnect(client Client, session *Session) (interface{}, error) { api.Unicaster.OnConnect(client, session) - return api.HandleSelf(session) + self, err := api.HandleSelf(session) + if err == nil { + api.Trigger(BusManagerConnect, session.Id, "", nil) + } + return self, err } func (api *channellingAPI) OnDisconnect(client Client, session *Session) { api.Unicaster.OnDisconnect(client, session) + api.Trigger(BusManagerDisconnect, session.Id, "", nil) } func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *DataIncoming) (interface{}, error) { @@ -85,6 +92,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data log.Println("Received invalid offer message.", msg) break } + api.Trigger(BusManagerOffer, session.Id, msg.Offer.To, nil) // TODO(longsleep): Validate offer session.Unicast(msg.Offer.To, msg.Offer) @@ -101,6 +109,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data log.Println("Received invalid answer message.", msg) break } + api.Trigger(BusManagerAnswer, session.Id, msg.Answer.To, nil) // TODO(longsleep): Validate Answer session.Unicast(msg.Answer.To, msg.Answer) @@ -117,6 +126,7 @@ func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *Data log.Println("Received invalid bye message.", msg) break } + api.Trigger(BusManagerBye, session.Id, msg.Bye.To, nil) session.Unicast(msg.Bye.To, msg.Bye) case "Status": @@ -226,6 +236,7 @@ func (api *channellingAPI) HandleAuthentication(session *Session, st *SessionTok log.Println("Authentication success", session.Userid()) self, err := api.HandleSelf(session) if err == nil { + api.Trigger("Auth", session.Id, session.Userid(), nil) session.BroadcastStatus() } diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index ee97ef4b..e32a0f0c 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -306,6 +306,20 @@ func runner(runtime phoenix.Runtime) error { tokenProvider = TokenFileProvider(tokenFile) } + // Nats pub/sub supports. + natsChannellingTrigger, _ := runtime.GetBool("nats", "channelling_trigger") + natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject") + if natsURL, err := runtime.GetString("nats", "url"); err == nil { + if natsURL != "" { + DefaultNatsURL = natsURL + } + } + if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { + if natsEstablishTimeout != 0 { + DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second + } + } + // Load remaining configuration items. config = NewConfig(runtime, tokenProvider != nil) @@ -407,7 +421,8 @@ func runner(runtime phoenix.Runtime) error { tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := NewStatsManager(hub, roomManager, sessionManager) - channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub) + busManager := NewBusManager(natsChannellingTrigger, natsChannellingTriggerSubject) + channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager) // Add handlers. r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) diff --git a/src/app/spreed-webrtc-server/nats.go b/src/app/spreed-webrtc-server/nats.go new file mode 100644 index 00000000..31db464e --- /dev/null +++ b/src/app/spreed-webrtc-server/nats.go @@ -0,0 +1,65 @@ +package main + +import ( + "errors" + "log" + "time" + + "github.com/nats-io/nats" +) + +var DefaultNatsEstablishTimeout = 60 * time.Second +var DefaultNatsURL = nats.DefaultURL + +func NewNatsConnection() (*nats.EncodedConn, error) { + nc, err := nats.Connect(DefaultNatsURL) + if err != nil { + return nil, err + } + ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + return nil, err + } + return ec, nil +} + +func EstablishNatsConnection(timeout *time.Duration) (*nats.EncodedConn, error) { + if timeout == nil { + timeout = &DefaultNatsEstablishTimeout + } + connch := make(chan *nats.EncodedConn, 1) + errch := make(chan error, 1) + go func() { + notify := true + for { + ec, err := NewNatsConnection() + if err == nil { + connch <- ec + break + } + switch err { + case nats.ErrTimeout: + fallthrough + case nats.ErrNoServers: + if notify { + notify = false + log.Println("Waiting for NATS server to become available") + } + time.Sleep(1 * time.Second) + continue + default: + errch <- err + break + } + } + }() + + select { + case conn := <-connch: + return conn, nil + case err := <-errch: + return nil, err + case <-time.After(*timeout): + return nil, errors.New("NATS connection: timeout") + } +}