WebRTC audio/video call and conferencing server.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

198 lines
5.0 KiB

package natsconnection
import (
"errors"
"log"
"time"
"github.com/nats-io/nats"
)
// DefaultEstablishTimeout is the default timeout for
// calls to EstablishNatsConnection.
var DefaultEstablishTimeout = 60 * time.Second
// DefaultRequestTimeout is the default timeout for Request() calls.
var DefaultRequestTimeout = 5 * time.Second
// DefaultURL is the default NATS server URL used for
// calls to NewConnection and EstablishConnection.
var DefaultURL = nats.DefaultURL
// Connection implements the wrapped nats.Conn.
type Connection struct {
*nats.Conn
}
// EncodedConnection implements the wrapped nats.EncodedConn.
type EncodedConnection struct {
*nats.EncodedConn
}
// NewConnection creates a connetion to the default NATS server
// and tries to establish the connection. It returns the connection
// and any connection error encountered.
func NewConnection() (*Connection, error) {
opts := &nats.Options{
Url: DefaultURL,
AllowReconnect: true,
MaxReconnect: -1, // Reconnect forever.
ReconnectWait: nats.DefaultReconnectWait,
Timeout: nats.DefaultTimeout,
PingInterval: nats.DefaultPingInterval,
MaxPingsOut: nats.DefaultMaxPingOut,
SubChanLen: nats.DefaultMaxChanLen,
ClosedCB: func(conn *nats.Conn) {
log.Println("NATS connection closed")
},
DisconnectedCB: func(conn *nats.Conn) {
log.Println("NATS disconnected")
},
ReconnectedCB: func(conn *nats.Conn) {
log.Println("NATS reconnected")
},
AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) {
log.Println("NATS async error", sub, err)
},
}
nc, err := opts.Connect()
if err != nil {
return nil, err
}
return &Connection{nc}, nil
}
// NewJSONEncodedConnection creates a JSON-encoded connetion to the
// default NATS server and tries to establish the connection. It
// returns the JSON-encoded connection and any connection error
// encountered.
func NewJSONEncodedConnection() (*EncodedConnection, error) {
nc, err := NewConnection()
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(nc.Conn, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return &EncodedConnection{ec}, nil
}
// EstablishConnection is a blocking way to create and establish
// connection to the default NATS server. The function will only return
// after a timeout has reached or a connection has been established. It
// returns the connection and and any timeout error encountered.
func EstablishConnection(timeout *time.Duration) (*Connection, error) {
if timeout == nil {
timeout = &DefaultEstablishTimeout
}
connch := make(chan *Connection, 1)
errch := make(chan error, 1)
go func() {
notify := true
for {
nc, err := NewConnection()
if err == nil {
connch <- nc
break
}
switch err {
case nats.ErrTimeout:
fallthrough
case nats.ErrNoServers:
if notify {
notify = false
log.Println("Waiting for NATS server to become available")
}
time.Sleep(1 * time.Second)
continue
default:
errch <- err
break
}
}
}()
select {
case conn := <-connch:
return conn, nil
case err := <-errch:
return nil, err
case <-time.After(*timeout):
return nil, errors.New("NATS connection: timeout")
}
}
// EstablishJSONEncodedConnection is a blocking way to create and establish
// JSON-encoded connection to the default NATS server. The function will
// only return after a timeout has reached or a connection has been
// established. It returns the JSON-encoded connection and and any timeout
// error encountered.
func EstablishJSONEncodedConnection(timeout *time.Duration) (*EncodedConnection, error) {
nc, err := EstablishConnection(timeout)
if err != nil {
return nil, err
}
ec, err := nats.NewEncodedConn(nc.Conn, nats.JSON_ENCODER)
if err != nil {
return nil, err
}
return &EncodedConnection{ec}, nil
}
// CallFuncWithRetry retries the given func when it does not return nil
// and the timeout duration has not been reached. It sleeps 1 second between
// each call. If the timeout is 0, the function will be retried forever.
func CallFuncWithRetry(f func() error, timeout time.Duration) error {
errch := make(chan error, 1)
quitch := make(chan bool)
var lastErr error
// Start our worker loop.
go func() {
for {
select {
case <-quitch:
// Quit requested.
return
default:
// Call our target function.
err := f()
switch err {
case nil:
// No error, success.
errch <- err
return
default:
// Remember last error.
lastErr = err
}
time.Sleep(1 * time.Second)
}
}
}()
// Create our wait channel. It either is an empty channel or
// it is filled when the timeout gets reached.
var waitch <-chan time.Time
if timeout == 0 {
// Create empty channel to wait forever.
waitch = make(<-chan time.Time)
} else {
waitch = time.After(timeout)
}
// Wait until something happens, either nil result or timeout.
select {
case err := <-errch:
return err
case <-waitch:
quitch <- true
if lastErr != nil {
return lastErr
}
return errors.New("Call with retry: timeout")
}
}