Browse Source

Moved NATS connecting helpers to own submodule.

pull/238/merge
Simon Eisenmann 9 years ago
parent
commit
447bc7fe8e
  1. 8
      go/channelling/bus_manager.go
  2. 96
      go/channelling/nats.go
  3. 137
      go/natsconnection/natsconnection.go
  4. 5
      src/app/spreed-webrtc-server/main.go

8
go/channelling/bus_manager.go

@ -26,7 +26,7 @@ import ( @@ -26,7 +26,7 @@ import (
"fmt"
"log"
"github.com/nats-io/nats"
"github.com/strukturag/spreed-webrtc/go/natsconnection"
)
const (
@ -98,12 +98,12 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error @@ -98,12 +98,12 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error
type natsBus struct {
id string
prefix string
ec *nats.EncodedConn
ec *natsconnection.EncodedConnection
triggerQueue chan *busQueueEntry
}
func newNatsBus(id, prefix string) (*natsBus, error) {
ec, err := EstablishNatsConnection(nil)
ec, err := natsconnection.EstablishJSONEncodedConnection(nil)
if err != nil {
return nil, err
}
@ -145,7 +145,7 @@ type busQueueEntry struct { @@ -145,7 +145,7 @@ type busQueueEntry struct {
data interface{}
}
func chPublish(ec *nats.EncodedConn, channel chan (*busQueueEntry)) {
func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntry)) {
for {
entry := <-channel
err := ec.Publish(entry.subject, entry.data)

96
go/channelling/nats.go

@ -1,96 +0,0 @@ @@ -1,96 +0,0 @@
package channelling
import (
"errors"
"log"
"time"
"github.com/nats-io/nats"
)
// DefaultNatsEstablishTimeout is the default timeout for
// calls to EstablishNatsConnection.
var DefaultNatsEstablishTimeout = 60 * time.Second
// DefaultNatsURL is the default NATS server URL used for
// calls to NewNatsConnection and EstablishNatsConnection.
var DefaultNatsURL = nats.DefaultURL
// NewNatsConnection creates a connetion to the default NATS server
// and tries to establish the connection. It returns the connection
// and any connection error encountered.
func NewNatsConnection() (*nats.EncodedConn, error) {
opts := &nats.Options{
Url: DefaultNatsURL,
AllowReconnect: true,
MaxReconnect: -1, // Reconnect forever.
ReconnectWait: nats.DefaultReconnectWait,
Timeout: nats.DefaultTimeout,
PingInterval: nats.DefaultPingInterval,
MaxPingsOut: nats.DefaultMaxPingOut,
SubChanLen: nats.DefaultMaxChanLen,
ClosedCB: func(conn *nats.Conn) {
log.Println("NATS connection closed")
},
DisconnectedCB: func(conn *nats.Conn) {
log.Println("NATS disconnected")
},
ReconnectedCB: func(conn *nats.Conn) {
log.Println("NATS reconnected")
},
}
nc, err := opts.Connect()
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return ec, nil
}
// EstablishNatsConnection is a blocking way to create and establish
// connection to the default NATS server. The function will only return
// after a timeout has reached or a connection has been established. It
// returns the connection and and any timeout error encountered.
func EstablishNatsConnection(timeout *time.Duration) (*nats.EncodedConn, error) {
if timeout == nil {
timeout = &DefaultNatsEstablishTimeout
}
connch := make(chan *nats.EncodedConn, 1)
errch := make(chan error, 1)
go func() {
notify := true
for {
ec, err := NewNatsConnection()
if err == nil {
connch <- ec
break
}
switch err {
case nats.ErrTimeout:
fallthrough
case nats.ErrNoServers:
if notify {
notify = false
log.Println("Waiting for NATS server to become available")
}
time.Sleep(1 * time.Second)
continue
default:
errch <- err
break
}
}
}()
select {
case conn := <-connch:
return conn, nil
case err := <-errch:
return nil, err
case <-time.After(*timeout):
return nil, errors.New("NATS connection: timeout")
}
}

137
go/natsconnection/natsconnection.go

@ -0,0 +1,137 @@ @@ -0,0 +1,137 @@
package natsconnection
import (
"errors"
"log"
"time"
"github.com/nats-io/nats"
)
// DefaultNatsEstablishTimeout is the default timeout for
// calls to EstablishNatsConnection.
var DefaultEstablishTimeout = 60 * time.Second
// DefaultNatsURL is the default NATS server URL used for
// calls to NewConnection and EstablishConnection.
var DefaultURL = nats.DefaultURL
// Connection implements the wrapped nats.Conn.
type Connection struct {
nats.Conn
}
// EncodedConnection implements the wrapped nats.EncodedConn.
type EncodedConnection struct {
nats.EncodedConn
}
// NewConnection creates a connetion to the default NATS server
// and tries to establish the connection. It returns the connection
// and any connection error encountered.
func NewConnection() (*Connection, error) {
opts := &nats.Options{
Url: DefaultURL,
AllowReconnect: true,
MaxReconnect: -1, // Reconnect forever.
ReconnectWait: nats.DefaultReconnectWait,
Timeout: nats.DefaultTimeout,
PingInterval: nats.DefaultPingInterval,
MaxPingsOut: nats.DefaultMaxPingOut,
SubChanLen: nats.DefaultMaxChanLen,
ClosedCB: func(conn *nats.Conn) {
log.Println("NATS connection closed")
},
DisconnectedCB: func(conn *nats.Conn) {
log.Println("NATS disconnected")
},
ReconnectedCB: func(conn *nats.Conn) {
log.Println("NATS reconnected")
},
}
nc, err := opts.Connect()
if err != nil {
return nil, err
}
return &Connection{*nc}, nil
}
// NewJSONEncodedConnection creates a JSON-encoded connetion to the
// default NATS server and tries to establish the connection. It
// returns the JSON-encoded connection and any connection error
// encountered.
func NewJSONEncodedConnection() (*EncodedConnection, error) {
nc, err := NewConnection()
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(&nc.Conn, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return &EncodedConnection{*ec}, nil
}
// EstablishConnection is a blocking way to create and establish
// connection to the default NATS server. The function will only return
// after a timeout has reached or a connection has been established. It
// returns the connection and and any timeout error encountered.
func EstablishConnection(timeout *time.Duration) (*Connection, error) {
if timeout == nil {
timeout = &DefaultEstablishTimeout
}
connch := make(chan *Connection, 1)
errch := make(chan error, 1)
go func() {
notify := true
for {
nc, err := NewConnection()
if err == nil {
connch <- nc
break
}
switch err {
case nats.ErrTimeout:
fallthrough
case nats.ErrNoServers:
if notify {
notify = false
log.Println("Waiting for NATS server to become available")
}
time.Sleep(1 * time.Second)
continue
default:
errch <- err
break
}
}
}()
select {
case conn := <-connch:
return conn, nil
case err := <-errch:
return nil, err
case <-time.After(*timeout):
return nil, errors.New("NATS connection: timeout")
}
}
// EstablishJSONEncodedConnection is a blocking way to create and establish
// JSON-encoded connection to the default NATS server. The function will
// only return after a timeout has reached or a connection has been
// established. It returns the JSON-encoded connection and and any timeout
// error encountered.
func EstablishJSONEncodedConnection(timeout *time.Duration) (*EncodedConnection, error) {
nc, err := EstablishConnection(timeout)
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(&nc.Conn, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return &EncodedConnection{*ec}, nil
}

5
src/app/spreed-webrtc-server/main.go

@ -41,6 +41,7 @@ import ( @@ -41,6 +41,7 @@ import (
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/strukturag/spreed-webrtc/go/channelling/api"
"github.com/strukturag/spreed-webrtc/go/channelling/server"
"github.com/strukturag/spreed-webrtc/go/natsconnection"
"github.com/gorilla/mux"
"github.com/strukturag/httputils"
@ -153,12 +154,12 @@ func runner(runtime phoenix.Runtime) error { @@ -153,12 +154,12 @@ func runner(runtime phoenix.Runtime) error {
natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject")
if natsURL, err := runtime.GetString("nats", "url"); err == nil {
if natsURL != "" {
channelling.DefaultNatsURL = natsURL
natsconnection.DefaultURL = natsURL
}
}
if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil {
if natsEstablishTimeout != 0 {
channelling.DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second
natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second
}
}
natsClientId, _ := runtime.GetString("nats", "client_id")

Loading…
Cancel
Save