diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go index 1336976e..50b24fec 100644 --- a/go/channelling/bus_manager.go +++ b/go/channelling/bus_manager.go @@ -26,7 +26,7 @@ import ( "fmt" "log" - "github.com/nats-io/nats" + "github.com/strukturag/spreed-webrtc/go/natsconnection" ) const ( @@ -98,12 +98,12 @@ func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error type natsBus struct { id string prefix string - ec *nats.EncodedConn + ec *natsconnection.EncodedConnection triggerQueue chan *busQueueEntry } func newNatsBus(id, prefix string) (*natsBus, error) { - ec, err := EstablishNatsConnection(nil) + ec, err := natsconnection.EstablishJSONEncodedConnection(nil) if err != nil { return nil, err } @@ -145,7 +145,7 @@ type busQueueEntry struct { data interface{} } -func chPublish(ec *nats.EncodedConn, channel chan (*busQueueEntry)) { +func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntry)) { for { entry := <-channel err := ec.Publish(entry.subject, entry.data) diff --git a/go/channelling/nats.go b/go/channelling/nats.go deleted file mode 100644 index d06c4651..00000000 --- a/go/channelling/nats.go +++ /dev/null @@ -1,96 +0,0 @@ -package channelling - -import ( - "errors" - "log" - "time" - - "github.com/nats-io/nats" -) - -// DefaultNatsEstablishTimeout is the default timeout for -// calls to EstablishNatsConnection. -var DefaultNatsEstablishTimeout = 60 * time.Second - -// DefaultNatsURL is the default NATS server URL used for -// calls to NewNatsConnection and EstablishNatsConnection. -var DefaultNatsURL = nats.DefaultURL - -// NewNatsConnection creates a connetion to the default NATS server -// and tries to establish the connection. It returns the connection -// and any connection error encountered. -func NewNatsConnection() (*nats.EncodedConn, error) { - opts := &nats.Options{ - Url: DefaultNatsURL, - AllowReconnect: true, - MaxReconnect: -1, // Reconnect forever. - ReconnectWait: nats.DefaultReconnectWait, - Timeout: nats.DefaultTimeout, - PingInterval: nats.DefaultPingInterval, - MaxPingsOut: nats.DefaultMaxPingOut, - SubChanLen: nats.DefaultMaxChanLen, - ClosedCB: func(conn *nats.Conn) { - log.Println("NATS connection closed") - }, - DisconnectedCB: func(conn *nats.Conn) { - log.Println("NATS disconnected") - }, - ReconnectedCB: func(conn *nats.Conn) { - log.Println("NATS reconnected") - }, - } - nc, err := opts.Connect() - if err != nil { - return nil, err - } - ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - return nil, err - } - return ec, nil -} - -// EstablishNatsConnection is a blocking way to create and establish -// connection to the default NATS server. The function will only return -// after a timeout has reached or a connection has been established. It -// returns the connection and and any timeout error encountered. -func EstablishNatsConnection(timeout *time.Duration) (*nats.EncodedConn, error) { - if timeout == nil { - timeout = &DefaultNatsEstablishTimeout - } - connch := make(chan *nats.EncodedConn, 1) - errch := make(chan error, 1) - go func() { - notify := true - for { - ec, err := NewNatsConnection() - if err == nil { - connch <- ec - break - } - switch err { - case nats.ErrTimeout: - fallthrough - case nats.ErrNoServers: - if notify { - notify = false - log.Println("Waiting for NATS server to become available") - } - time.Sleep(1 * time.Second) - continue - default: - errch <- err - break - } - } - }() - - select { - case conn := <-connch: - return conn, nil - case err := <-errch: - return nil, err - case <-time.After(*timeout): - return nil, errors.New("NATS connection: timeout") - } -} diff --git a/go/natsconnection/natsconnection.go b/go/natsconnection/natsconnection.go new file mode 100644 index 00000000..f4acb1ae --- /dev/null +++ b/go/natsconnection/natsconnection.go @@ -0,0 +1,137 @@ +package natsconnection + +import ( + "errors" + "log" + "time" + + "github.com/nats-io/nats" +) + +// DefaultNatsEstablishTimeout is the default timeout for +// calls to EstablishNatsConnection. +var DefaultEstablishTimeout = 60 * time.Second + +// DefaultNatsURL is the default NATS server URL used for +// calls to NewConnection and EstablishConnection. +var DefaultURL = nats.DefaultURL + +// Connection implements the wrapped nats.Conn. +type Connection struct { + nats.Conn +} + +// EncodedConnection implements the wrapped nats.EncodedConn. +type EncodedConnection struct { + nats.EncodedConn +} + +// NewConnection creates a connetion to the default NATS server +// and tries to establish the connection. It returns the connection +// and any connection error encountered. +func NewConnection() (*Connection, error) { + opts := &nats.Options{ + Url: DefaultURL, + AllowReconnect: true, + MaxReconnect: -1, // Reconnect forever. + ReconnectWait: nats.DefaultReconnectWait, + Timeout: nats.DefaultTimeout, + PingInterval: nats.DefaultPingInterval, + MaxPingsOut: nats.DefaultMaxPingOut, + SubChanLen: nats.DefaultMaxChanLen, + ClosedCB: func(conn *nats.Conn) { + log.Println("NATS connection closed") + }, + DisconnectedCB: func(conn *nats.Conn) { + log.Println("NATS disconnected") + }, + ReconnectedCB: func(conn *nats.Conn) { + log.Println("NATS reconnected") + }, + } + + nc, err := opts.Connect() + if err != nil { + return nil, err + } + + return &Connection{*nc}, nil +} + +// NewJSONEncodedConnection creates a JSON-encoded connetion to the +// default NATS server and tries to establish the connection. It +// returns the JSON-encoded connection and any connection error +// encountered. +func NewJSONEncodedConnection() (*EncodedConnection, error) { + nc, err := NewConnection() + if err != nil { + return nil, err + } + ec, err := nats.NewEncodedConn(&nc.Conn, nats.JSON_ENCODER) + if err != nil { + return nil, err + } + return &EncodedConnection{*ec}, nil +} + +// EstablishConnection is a blocking way to create and establish +// connection to the default NATS server. The function will only return +// after a timeout has reached or a connection has been established. It +// returns the connection and and any timeout error encountered. +func EstablishConnection(timeout *time.Duration) (*Connection, error) { + if timeout == nil { + timeout = &DefaultEstablishTimeout + } + connch := make(chan *Connection, 1) + errch := make(chan error, 1) + go func() { + notify := true + for { + nc, err := NewConnection() + if err == nil { + connch <- nc + break + } + switch err { + case nats.ErrTimeout: + fallthrough + case nats.ErrNoServers: + if notify { + notify = false + log.Println("Waiting for NATS server to become available") + } + time.Sleep(1 * time.Second) + continue + default: + errch <- err + break + } + } + }() + + select { + case conn := <-connch: + return conn, nil + case err := <-errch: + return nil, err + case <-time.After(*timeout): + return nil, errors.New("NATS connection: timeout") + } +} + +// EstablishJSONEncodedConnection is a blocking way to create and establish +// JSON-encoded connection to the default NATS server. The function will +// only return after a timeout has reached or a connection has been +// established. It returns the JSON-encoded connection and and any timeout +// error encountered. +func EstablishJSONEncodedConnection(timeout *time.Duration) (*EncodedConnection, error) { + nc, err := EstablishConnection(timeout) + if err != nil { + return nil, err + } + ec, err := nats.NewEncodedConn(&nc.Conn, nats.JSON_ENCODER) + if err != nil { + return nil, err + } + return &EncodedConnection{*ec}, nil +} diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index e8f31e23..9d27fb48 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -41,6 +41,7 @@ import ( "github.com/strukturag/spreed-webrtc/go/channelling" "github.com/strukturag/spreed-webrtc/go/channelling/api" "github.com/strukturag/spreed-webrtc/go/channelling/server" + "github.com/strukturag/spreed-webrtc/go/natsconnection" "github.com/gorilla/mux" "github.com/strukturag/httputils" @@ -153,12 +154,12 @@ func runner(runtime phoenix.Runtime) error { natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject") if natsURL, err := runtime.GetString("nats", "url"); err == nil { if natsURL != "" { - channelling.DefaultNatsURL = natsURL + natsconnection.DefaultURL = natsURL } } if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { if natsEstablishTimeout != 0 { - channelling.DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second + natsconnection.DefaultEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second } } natsClientId, _ := runtime.GetString("nats", "client_id")