Browse Source

Major refactoring of server side code to allow isolated unit tests.

In addition to adding unit tests for the "Hello" message,
the following notable improvements are included:
 * Separate websocket callbacks from the hub via a handler API
   and adaptor.
 * Move all application specific state to the session.
 * Session no longer refers to the hub.
 * Remove redundant MessageRequest struct.
 * Hub is no longer responsible for buffer pool or
   buddy image management.
 * Consolidated connection table locking in the hub.
 * Remove redundant session table from the hub.
 * Split room join and leave into separate handlers.
   This also removes the RoomConnectionUpdate struct.
 * Entirely remove room management from the hub.
   This also provides room operations with a separate mutex.
 * Split stats into a separate service.
 * Simplify the session token handler.
 * Buddy image HTTP handler no longer takes the entire hub.
 * Centralize JSON encoding and decoding. This removes JSON
   encoding from the room worker queue.
 * Improve unicast message statistics.
 * Numerous other renamings and cleanup items.
pull/112/head
Lance Cooper 11 years ago committed by Simon Eisenmann
parent
commit
713a2b903b
  1. 18
      src/app/spreed-webrtc-server/buffercache.go
  2. 268
      src/app/spreed-webrtc-server/channelling_api.go
  3. 134
      src/app/spreed-webrtc-server/channelling_api_test.go
  4. 87
      src/app/spreed-webrtc-server/client.go
  5. 164
      src/app/spreed-webrtc-server/connection.go
  6. 566
      src/app/spreed-webrtc-server/hub.go
  7. 69
      src/app/spreed-webrtc-server/incoming_codec.go
  8. 29
      src/app/spreed-webrtc-server/main.go
  9. 170
      src/app/spreed-webrtc-server/room_manager.go
  10. 136
      src/app/spreed-webrtc-server/roomworker.go
  11. 280
      src/app/spreed-webrtc-server/server.go
  12. 28
      src/app/spreed-webrtc-server/session.go
  13. 173
      src/app/spreed-webrtc-server/session_manager.go
  14. 13
      src/app/spreed-webrtc-server/sessions.go
  15. 8
      src/app/spreed-webrtc-server/stats.go
  16. 104
      src/app/spreed-webrtc-server/stats_manager.go
  17. 130
      src/app/spreed-webrtc-server/tickets.go
  18. 32
      src/app/spreed-webrtc-server/users.go
  19. 35
      src/app/spreed-webrtc-server/ws.go

18
src/app/spreed-webrtc-server/buffercache.go

@ -160,3 +160,21 @@ func (cache *bufferCache) New() Buffer {
func (cache *bufferCache) Wrap(data []byte) Buffer { func (cache *bufferCache) Wrap(data []byte) Buffer {
return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)} return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)}
} }
func readAll(dest Buffer, r io.Reader) error {
var err error
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
_, err = dest.ReadFrom(r)
return err
}

268
src/app/spreed-webrtc-server/channelling_api.go

@ -0,0 +1,268 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"log"
"strings"
"time"
)
const (
maxConferenceSize = 100
)
type ChannellingAPI interface {
OnConnect(Client, *Session)
OnIncoming(ResponseSender, *Session, *DataIncoming)
OnDisconnect(*Session)
}
type channellingAPI struct {
version string
*Config
RoomStatusManager
SessionEncoder
SessionManager
StatsCounter
ContactManager
TurnDataCreator
Unicaster
Broadcaster
buddyImages ImageCache
}
func NewChannellingAPI(version string, config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, broadcaster Broadcaster, buddyImages ImageCache) ChannellingAPI {
return &channellingAPI{
version,
config,
roomStatus,
sessionEncoder,
sessionManager,
statsCounter,
contactManager,
turnDataCreator,
unicaster,
broadcaster,
buddyImages,
}
}
func (api *channellingAPI) OnConnect(client Client, session *Session) {
api.Unicaster.OnConnect(client, session)
api.SendSelf(client, session)
}
func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *DataIncoming) {
switch msg.Type {
case "Self":
api.SendSelf(c, session)
case "Hello":
//log.Println("Hello", msg.Hello, c.Index())
// TODO(longsleep): Filter room id and user agent.
api.UpdateSession(session, &SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua})
if session.Hello && session.Roomid != msg.Hello.Id {
api.LeaveRoom(session)
api.Broadcast(session, session.DataSessionLeft("soft"))
}
if api.CanJoinRoom(msg.Hello.Id) {
session.Hello = true
session.Roomid = msg.Hello.Id
api.JoinRoom(session, c)
api.Broadcast(session, session.DataSessionJoined())
} else {
session.Hello = false
}
case "Offer":
// TODO(longsleep): Validate offer
api.Unicast(session, msg.Offer.To, msg.Offer)
case "Candidate":
// TODO(longsleep): Validate candidate
api.Unicast(session, msg.Candidate.To, msg.Candidate)
case "Answer":
// TODO(longsleep): Validate Answer
api.Unicast(session, msg.Answer.To, msg.Answer)
case "Users":
if session.Hello {
sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)}
c.Reply(msg.Iid, sessions)
}
case "Authentication":
st := msg.Authentication.Authentication
if st == nil {
return
}
if err := api.Authenticate(session, st, ""); err == nil {
log.Println("Authentication success", session.Userid)
api.SendSelf(c, session)
api.BroadcastSessionStatus(session)
} else {
log.Println("Authentication failed", err, st.Userid, st.Nonce)
}
case "Bye":
api.Unicast(session, msg.Bye.To, msg.Bye)
case "Status":
//log.Println("Status", msg.Status)
api.UpdateSession(session, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status})
api.BroadcastSessionStatus(session)
case "Chat":
// TODO(longsleep): Limit sent chat messages per incoming connection.
if !msg.Chat.Chat.NoEcho {
api.Unicast(session, session.Id, msg.Chat)
}
msg.Chat.Chat.Time = time.Now().Format(time.RFC3339)
if msg.Chat.To == "" {
// TODO(longsleep): Check if chat broadcast is allowed.
if session.Hello {
api.CountBroadcastChat()
api.Broadcast(session, msg.Chat)
}
} else {
if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil {
if err := api.contactrequestHandler(session, msg.Chat.To, msg.Chat.Chat.Status.ContactRequest); err != nil {
log.Println("Ignoring invalid contact request.", err)
return
}
msg.Chat.Chat.Status.ContactRequest.Userid = session.Userid()
}
if msg.Chat.Chat.Status == nil {
api.CountUnicastChat()
}
api.Unicast(session, msg.Chat.To, msg.Chat)
if msg.Chat.Chat.Mid != "" {
// Send out delivery confirmation status chat message.
api.Unicast(session, session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}})
}
}
case "Conference":
// Check conference maximum size.
if len(msg.Conference.Conference) > maxConferenceSize {
log.Println("Refusing to create conference above limit.", len(msg.Conference.Conference))
} else {
// Send conference update to anyone.
for _, id := range msg.Conference.Conference {
if id != session.Id {
api.Unicast(session, id, msg.Conference)
}
}
}
case "Alive":
c.Reply(msg.Iid, msg.Alive)
case "Sessions":
var users []*DataSession
switch msg.Sessions.Sessions.Type {
case "contact":
if userID, err := api.getContactID(session, msg.Sessions.Sessions.Token); err == nil {
users = api.GetUserSessions(session, userID)
} else {
log.Printf(err.Error())
}
case "session":
id, err := session.attestation.Decode(msg.Sessions.Sessions.Token)
if err != nil {
log.Printf("Failed to decode incoming attestation", err, msg.Sessions.Sessions.Token)
break
}
session, ok := api.GetSession(id)
if !ok {
log.Printf("Cannot retrieve session for id %s", id)
break
}
users = make([]*DataSession, 1, 1)
users[0] = session.Data()
default:
log.Printf("Unkown incoming sessions request type %s", msg.Sessions.Sessions.Type)
}
// TODO(lcooper): We ought to reply with a *DataError here if failed.
if users != nil {
c.Reply(msg.Iid, &DataSessions{Type: "Sessions", Users: users, Sessions: msg.Sessions.Sessions})
}
default:
log.Println("OnText unhandled message type", msg.Type)
}
}
func (api *channellingAPI) OnDisconnect(session *Session) {
dsl := session.DataSessionLeft("hard")
if session.Hello {
api.LeaveRoom(session)
api.Broadcast(session, dsl)
}
session.RunForAllSubscribers(func(session *Session) {
log.Println("Notifying subscriber that we are gone", session.Id, session.Id)
api.Unicast(session, session.Id, dsl)
})
api.Unicaster.OnDisconnect(session)
api.buddyImages.Delete(session.Id)
}
func (api *channellingAPI) SendSelf(c Responder, session *Session) {
token, err := api.EncodeSessionToken(session)
if err != nil {
log.Println("Error in OnRegister", err)
return
}
log.Println("Created new session token", len(token), token)
self := &DataSelf{
Type: "Self",
Id: session.Id,
Sid: session.Sid,
Userid: session.Userid(),
Suserid: api.EncodeSessionUserID(session),
Token: token,
Version: api.version,
Turn: api.CreateTurnData(session),
Stun: api.StunURIs,
}
c.Reply("", self)
}
func (api *channellingAPI) UpdateSession(session *Session, s *SessionUpdate) uint64 {
if s.Status != nil {
status, ok := s.Status.(map[string]interface{})
if ok && status["buddyPicture"] != nil {
pic := status["buddyPicture"].(string)
if strings.HasPrefix(pic, "data:") {
imageId := api.buddyImages.Update(session.Id, pic[5:])
if imageId != "" {
status["buddyPicture"] = "img:" + imageId
}
}
}
}
return session.Update(s)
}
func (api *channellingAPI) BroadcastSessionStatus(session *Session) {
if session.Hello {
api.Broadcast(session, session.DataSessionStatus())
}
}

134
src/app/spreed-webrtc-server/channelling_api_test.go

@ -0,0 +1,134 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"testing"
)
const (
testAppVersion string = "0.0.0+unittests"
)
type fakeClient struct {
}
func (fake *fakeClient) Send(_ Buffer) {
}
func (fake *fakeClient) Reply(_ string, _ interface{}) {
}
type fakeRoomManager struct {
disallowJoin bool
joinedRoomID string
leftRoomID string
roomUsers []*DataSession
joinedID string
leftID string
broadcasts []interface{}
}
func (fake *fakeRoomManager) CanJoinRoom(roomID string) bool {
return !fake.disallowJoin
}
func (fake *fakeRoomManager) RoomUsers(session *Session) []*DataSession {
return fake.roomUsers
}
func (fake *fakeRoomManager) JoinRoom(session *Session, _ Sender) {
fake.joinedID = session.Roomid
}
func (fake *fakeRoomManager) LeaveRoom(session *Session) {
fake.leftID = session.Roomid
}
func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) {
fake.broadcasts = append(fake.broadcasts, msg)
}
func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) {
client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{}
return NewChannellingAPI(testAppVersion, nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager
}
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {
roomID, ua := "foobar", "unit tests"
api, client, session, roomManager := NewTestChannellingAPI()
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomID, Ua: ua}})
if roomManager.joinedID != roomID {
t.Errorf("Expected to have joined room %v, but got %v", roomID, roomManager.joinedID)
}
if broadcastCount := len(roomManager.broadcasts); broadcastCount != 1 {
t.Fatalf("Expected 1 broadcast, but got %d", broadcastCount)
}
dataSession, ok := roomManager.broadcasts[0].(*DataSession)
if !ok {
t.Fatal("Expected a session data broadcast")
}
if dataSession.Ua != ua {
t.Errorf("Expected to have broadcasted a user agent of %v, but was %v", ua, dataSession.Ua)
}
}
func Test_ChannellingAPI_OnIncoming_HelloMessage_LeavesAnyPreviouslyJoinedRooms(t *testing.T) {
roomID := "foobar"
api, client, session, roomManager := NewTestChannellingAPI()
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomID}})
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: "baz"}})
if roomManager.leftID != roomID {
t.Errorf("Expected to have left room %v, but got %v", roomID, roomManager.leftID)
}
if broadcastCount := len(roomManager.broadcasts); broadcastCount != 3 {
t.Fatalf("Expected 3 broadcasts, but got %d", broadcastCount)
}
dataSession, ok := roomManager.broadcasts[1].(*DataSession)
if !ok {
t.Fatal("Expected a session data broadcast")
}
if status := "soft"; dataSession.Status != status {
t.Errorf("Expected to have broadcast a leave status of of %v, but was %v", status, dataSession.Status)
}
}
func Test_ChannellingAPI_OnIncoming_HelloMessage_DoesNotJoinIfNotPermitted(t *testing.T) {
api, client, session, roomManager := NewTestChannellingAPI()
roomManager.disallowJoin = true
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{}})
if broadcastCount := len(roomManager.broadcasts); broadcastCount != 0 {
t.Fatalf("Expected no broadcasts, but got %d", broadcastCount)
}
}

87
src/app/spreed-webrtc-server/client.go

@ -0,0 +1,87 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"log"
)
type Sender interface {
Send(Buffer)
}
type ResponseSender interface {
Sender
Responder
}
type Responder interface {
Reply(iid string, m interface{})
}
type Client interface {
ResponseSender
Session() *Session
Index() uint64
Close(bool)
}
type client struct {
Codec
ChannellingAPI
Connection
session *Session
}
func NewClient(codec Codec, api ChannellingAPI, session *Session) *client {
return &client{codec, api, nil, session}
}
func (client *client) OnConnect(conn Connection) {
client.Connection = conn
client.ChannellingAPI.OnConnect(client, client.session)
}
func (client *client) OnText(b Buffer) {
if incoming, err := client.DecodeIncoming(b); err == nil {
client.OnIncoming(client, client.session, incoming)
} else {
log.Println("OnText error while decoding JSON", err)
log.Printf("JSON:\n%s\n", b)
}
}
func (client *client) OnDisconnect() {
client.ChannellingAPI.OnDisconnect(client.session)
}
func (client *client) Reply(iid string, m interface{}) {
outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m}
if b, err := client.EncodeOutgoing(outgoing); err == nil {
client.Send(b)
b.Decref()
}
}
func (client *client) Session() *Session {
return client.session
}

164
src/app/spreed-webrtc-server/connection.go

@ -22,14 +22,13 @@
package main package main
import ( import (
"bytes"
"container/list" "container/list"
"github.com/gorilla/websocket"
"io" "io"
"log" "log"
"net/http"
"sync" "sync"
"time" "time"
"github.com/gorilla/websocket"
) )
const ( const (
@ -54,110 +53,77 @@ const (
maxRatePerSecond = 20 maxRatePerSecond = 20
) )
type Connection struct { type Connection interface {
Index() uint64
Send(Buffer)
Close(runCallbacks bool)
readPump()
writePump()
}
type ConnectionHandler interface {
NewBuffer() Buffer
OnConnect(Connection)
OnText(Buffer)
OnDisconnect()
}
type connection struct {
// References. // References.
h *Hub
ws *websocket.Conn ws *websocket.Conn
request *http.Request handler ConnectionHandler
// Data handling. // Data handling.
condition *sync.Cond condition *sync.Cond
queue list.List queue list.List
mutex sync.Mutex mutex sync.Mutex
isClosed bool isClosed bool
isClosing bool
// Metadata.
Id string
Roomid string // Keep Roomid here for quick acess without locking c.Session.
Idx uint64
Session *Session
IsRegistered bool
Hello bool
Version string
}
func NewConnection(h *Hub, ws *websocket.Conn, request *http.Request) *Connection { // Debugging
Idx uint64
}
c := &Connection{ func NewConnection(index uint64, ws *websocket.Conn, handler ConnectionHandler) Connection {
h: h, c := &connection{
ws: ws, ws: ws,
request: request, handler: handler,
Idx: index,
} }
c.condition = sync.NewCond(&c.mutex) c.condition = sync.NewCond(&c.mutex)
return c return c
} }
func (c *Connection) close() { func (c *connection) Index() uint64 {
return c.Idx
}
if !c.isClosed { func (c *connection) Close(runCallbacks bool) {
c.ws.Close() c.mutex.Lock()
c.Session.Close() if c.isClosed {
c.mutex.Lock()
c.Session = nil
c.isClosed = true
for {
head := c.queue.Front()
if head == nil {
break
}
c.queue.Remove(head)
message := head.Value.(Buffer)
message.Decref()
}
c.condition.Signal()
c.mutex.Unlock() c.mutex.Unlock()
return
} }
if runCallbacks {
} c.handler.OnDisconnect()
func (c *Connection) register() error {
s := c.h.CreateSession(c.request, nil)
c.h.registerHandler(c, s)
return nil
}
func (c *Connection) reregister(token string) error {
if st, err := c.h.DecodeSessionToken(token); err == nil {
s := c.h.CreateSession(c.request, st)
c.h.registerHandler(c, s)
} else {
log.Println("Error while decoding session token", err)
c.register()
} }
return nil c.ws.Close()
c.isClosed = true
} for {
head := c.queue.Front()
func (c *Connection) unregister() { if head == nil {
c.isClosing = true break
c.h.unregisterHandler(c)
}
func (c *Connection) readAll(dest Buffer, r io.Reader) error {
var err error
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
} }
}() c.queue.Remove(head)
message := head.Value.(Buffer)
_, err = dest.ReadFrom(r) message.Decref()
return err }
c.condition.Signal()
c.mutex.Unlock()
} }
// readPump pumps messages from the websocket connection to the hub. // readPump pumps messages from the websocket connection to the hub.
func (c *Connection) readPump() { func (c *connection) readPump() {
c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetPongHandler(func(string) error {
@ -165,6 +131,10 @@ func (c *Connection) readPump() {
return nil return nil
}) })
times := list.New() times := list.New()
// NOTE(lcooper): This more or less assumes that the write pump is started.
c.handler.OnConnect(c)
for { for {
//fmt.Println("readPump wait nextReader", c.Idx) //fmt.Println("readPump wait nextReader", c.Idx)
op, r, err := c.ws.NextReader() op, r, err := c.ws.NextReader()
@ -177,12 +147,6 @@ func (c *Connection) readPump() {
} }
switch op { switch op {
case websocket.TextMessage: case websocket.TextMessage:
message := c.h.buffers.New()
err = c.readAll(message, r)
if err != nil {
message.Decref()
break
}
now := time.Now() now := time.Now()
if times.Len() == maxRatePerSecond { if times.Len() == maxRatePerSecond {
front := times.Front() front := times.Front()
@ -194,18 +158,23 @@ func (c *Connection) readPump() {
} }
} }
times.PushBack(now) times.PushBack(now)
c.h.server.OnText(c, message)
message := c.handler.NewBuffer()
err = readAll(message, r)
if err != nil {
message.Decref()
break
}
c.handler.OnText(message)
message.Decref() message.Decref()
} }
} }
c.unregister() c.Close(true)
c.ws.Close()
} }
// Write message to outbound queue. // Write message to outbound queue.
func (c *Connection) send(message Buffer) { func (c *connection) Send(message Buffer) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if c.isClosed { if c.isClosed {
@ -223,8 +192,7 @@ func (c *Connection) send(message Buffer) {
} }
// writePump pumps messages from the queue to the websocket connection. // writePump pumps messages from the queue to the websocket connection.
func (c *Connection) writePump() { func (c *connection) writePump() {
var timer *time.Timer var timer *time.Timer
ping := false ping := false
@ -301,16 +269,16 @@ func (c *Connection) writePump() {
cleanup: cleanup:
//fmt.Println("writePump done") //fmt.Println("writePump done")
timer.Stop() timer.Stop()
c.ws.Close() c.Close(true)
} }
// Write ping message. // Write ping message.
func (c *Connection) ping() error { func (c *connection) ping() error {
return c.write(websocket.PingMessage, []byte{}) return c.write(websocket.PingMessage, []byte{})
} }
// Write writes a message with the given opCode and payload. // Write writes a message with the given opCode and payload.
func (c *Connection) write(opCode int, payload []byte) error { func (c *connection) write(opCode int, payload []byte) error {
c.ws.SetWriteDeadline(time.Now().Add(writeWait)) c.ws.SetWriteDeadline(time.Now().Add(writeWait))
return c.ws.WriteMessage(opCode, payload) return c.ws.WriteMessage(opCode, payload)
} }

566
src/app/spreed-webrtc-server/hub.go

@ -22,21 +22,16 @@
package main package main
import ( import (
"bytes"
"crypto/aes" "crypto/aes"
"crypto/hmac" "crypto/hmac"
"crypto/sha1" "crypto/sha1"
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"log" "log"
"net/http"
"strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -46,85 +41,56 @@ const (
maxUsersLength = 5000 maxUsersLength = 5000
) )
// TODO(longsleep): Get rid of MessageRequest type. type SessionStore interface {
type MessageRequest struct { GetSession(id string) (session *Session, ok bool)
From string
To string
Message Buffer
Id string
} }
type HubStat struct { type Unicaster interface {
Rooms int `json:"rooms"` SessionStore
Connections int `json:"connections"` OnConnect(Client, *Session)
Sessions int `json:"sessions"` Unicast(session *Session, to string, m interface{})
Users int `json:"users"` OnDisconnect(*Session)
Count uint64 `json:"count"`
BroadcastChatMessages uint64 `json:"broadcastchatmessages"`
UnicastChatMessages uint64 `json:"unicastchatmessages"`
IdsInRoom map[string][]string `json:"idsinroom,omitempty"`
SessionsById map[string]*DataSession `json:"sessionsbyid,omitempty"`
UsersById map[string]*DataUser `json:"usersbyid,omitempty"`
ConnectionsByIdx map[string]string `json:"connectionsbyidx,omitempty"`
} }
type Hub struct { type ContactManager interface {
server *Server contactrequestHandler(*Session, string, *DataContactRequest) error
connectionTable map[string]*Connection getContactID(*Session, string) (string, error)
sessionTable map[string]*Session
roomTable map[string]*RoomWorker
userTable map[string]*User
fakesessionTable map[string]*Session
version string
config *Config
sessionSecret []byte
encryptionSecret []byte
turnSecret []byte
tickets *securecookie.SecureCookie
attestations *securecookie.SecureCookie
count uint64
mutex sync.RWMutex
buffers BufferCache
broadcastChatMessages uint64
unicastChatMessages uint64
buddyImages ImageCache
realm string
tokenName string
useridRetriever func(*http.Request) (string, error)
contacts *securecookie.SecureCookie
} }
func NewHub(version string, config *Config, sessionSecret, encryptionSecret, turnSecret []byte, realm string) *Hub { type TurnDataCreator interface {
CreateTurnData(*Session) *DataTurn
h := &Hub{ }
connectionTable: make(map[string]*Connection),
sessionTable: make(map[string]*Session), type ClientStats interface {
roomTable: make(map[string]*RoomWorker), ClientInfo(details bool) (int, map[string]*DataSession, map[string]string)
userTable: make(map[string]*User), }
fakesessionTable: make(map[string]*Session),
version: version, type Hub interface {
config: config, ClientStats
sessionSecret: sessionSecret, Unicaster
encryptionSecret: encryptionSecret, TurnDataCreator
turnSecret: turnSecret, ContactManager
realm: realm, }
}
if len(h.sessionSecret) < 32 { type hub struct {
log.Printf("Weak sessionSecret (only %d bytes). It is recommended to use a key with 32 or 64 bytes.\n", len(h.sessionSecret)) OutgoingEncoder
clients map[string]Client
config *Config
turnSecret []byte
mutex sync.RWMutex
contacts *securecookie.SecureCookie
}
func NewHub(config *Config, sessionSecret, encryptionSecret, turnSecret []byte, encoder OutgoingEncoder) Hub {
h := &hub{
OutgoingEncoder: encoder,
clients: make(map[string]Client),
config: config,
turnSecret: turnSecret,
} }
h.tickets = securecookie.New(h.sessionSecret, h.encryptionSecret) h.contacts = securecookie.New(sessionSecret, encryptionSecret)
h.tickets.MaxAge(86400 * 30) // 30 days
h.tickets.HashFunc(sha256.New)
h.tickets.BlockFunc(aes.NewCipher)
h.attestations = securecookie.New(h.sessionSecret, nil)
h.attestations.MaxAge(300) // 5 minutes
h.tickets.HashFunc(sha256.New)
h.buffers = NewBufferCache(1024, bytes.MinRead)
h.buddyImages = NewImageCache()
h.tokenName = fmt.Sprintf("token@%s", h.realm)
h.contacts = securecookie.New(h.sessionSecret, h.encryptionSecret)
h.contacts.MaxAge(0) // Forever h.contacts.MaxAge(0) // Forever
h.contacts.HashFunc(sha256.New) h.contacts.HashFunc(sha256.New)
h.contacts.BlockFunc(aes.NewCipher) h.contacts.BlockFunc(aes.NewCipher)
@ -132,48 +98,27 @@ func NewHub(version string, config *Config, sessionSecret, encryptionSecret, tur
} }
func (h *Hub) Stat(details bool) *HubStat { func (h *hub) ClientInfo(details bool) (clientCount int, sessions map[string]*DataSession, connections map[string]string) {
h.mutex.RLock() h.mutex.RLock()
defer h.mutex.RUnlock() defer h.mutex.RUnlock()
stat := &HubStat{
Rooms: len(h.roomTable), clientCount = len(h.clients)
Connections: len(h.connectionTable),
Sessions: len(h.sessionTable),
Users: len(h.userTable),
Count: h.count,
BroadcastChatMessages: atomic.LoadUint64(&h.broadcastChatMessages),
UnicastChatMessages: atomic.LoadUint64(&h.unicastChatMessages),
}
if details { if details {
rooms := make(map[string][]string) sessions = make(map[string]*DataSession)
for roomid, room := range h.roomTable { for id, client := range h.clients {
sessions := make([]string, 0, len(room.connections)) sessions[id] = client.Session().Data()
for id := range room.connections {
sessions = append(sessions, id)
}
rooms[roomid] = sessions
}
stat.IdsInRoom = rooms
sessions := make(map[string]*DataSession)
for sessionid, session := range h.sessionTable {
sessions[sessionid] = session.Data()
}
stat.SessionsById = sessions
users := make(map[string]*DataUser)
for userid, user := range h.userTable {
users[userid] = user.Data()
} }
stat.UsersById = users
connections := make(map[string]string) connections = make(map[string]string)
for id, connection := range h.connectionTable { for id, client := range h.clients {
connections[fmt.Sprintf("%d", connection.Idx)] = id connections[fmt.Sprintf("%d", client.Index())] = id
} }
stat.ConnectionsByIdx = connections
} }
return stat
return
} }
func (h *Hub) CreateTurnData(id string) *DataTurn { func (h *hub) CreateTurnData(session *Session) *DataTurn {
// Create turn data credentials for shared secret auth with TURN // Create turn data credentials for shared secret auth with TURN
// server. See http://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 // server. See http://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
@ -182,6 +127,7 @@ func (h *Hub) CreateTurnData(id string) *DataTurn {
if len(h.turnSecret) == 0 { if len(h.turnSecret) == 0 {
return &DataTurn{} return &DataTurn{}
} }
id := session.Id
bar := sha256.New() bar := sha256.New()
bar.Write([]byte(id)) bar.Write([]byte(id))
id = base64.StdEncoding.EncodeToString(bar.Sum(nil)) id = base64.StdEncoding.EncodeToString(bar.Sum(nil))
@ -194,389 +140,85 @@ func (h *Hub) CreateTurnData(id string) *DataTurn {
} }
func (h *Hub) CreateSuserid(session *Session) (suserid string) { func (h *hub) GetSession(id string) (session *Session, ok bool) {
userid := session.Userid() var client Client
if userid != "" { client, ok = h.GetClient(id)
m := hmac.New(sha256.New, h.encryptionSecret) if ok {
m.Write([]byte(userid)) session = client.Session()
suserid = base64.StdEncoding.EncodeToString(m.Sum(nil))
} }
return return
} }
func (h *Hub) CreateSession(request *http.Request, st *SessionToken) *Session { func (h *hub) OnConnect(client Client, session *Session) {
// Set flags.
var session *Session
var userid string
usersEnabled := h.config.UsersEnabled
if usersEnabled && h.useridRetriever != nil {
userid, _ = h.useridRetriever(request)
}
if st == nil {
sid := NewRandomString(32)
id, _ := h.tickets.Encode("id", sid)
session = NewSession(h, id, sid)
log.Println("Created new session id", id)
} else {
if userid == "" {
userid = st.Userid
}
if !usersEnabled {
userid = ""
}
session = NewSession(h, st.Id, st.Sid)
}
if userid != "" {
h.authenticateHandler(session, st, userid)
}
return session
}
func (h *Hub) CreateFakeSession(userid string) *Session {
h.mutex.Lock()
session, ok := h.fakesessionTable[userid]
if !ok {
sid := fmt.Sprintf("fake-%s", NewRandomString(27))
id, _ := h.tickets.Encode("id", sid)
log.Println("Created new fake session id", id)
session = NewSession(h, id, sid)
session.SetUseridFake(userid)
h.fakesessionTable[userid] = session
}
h.mutex.Unlock()
return session
}
func (h *Hub) ValidateSession(id, sid string) bool {
var decoded string
err := h.tickets.Decode("id", id, &decoded)
if err != nil {
log.Println("Session validation error", err, id, sid)
return false
}
if decoded != sid {
log.Println("Session validation failed", id, sid)
return false
}
return true
}
func (h *Hub) EncodeSessionToken(st *SessionToken) (string, error) {
return h.tickets.Encode(h.tokenName, st)
}
func (h *Hub) DecodeSessionToken(token string) (*SessionToken, error) {
st := &SessionToken{}
err := h.tickets.Decode(h.tokenName, token, st)
return st, err
}
func (h *Hub) GetRoom(id string) *RoomWorker {
h.mutex.RLock()
room, ok := h.roomTable[id]
if !ok {
h.mutex.RUnlock()
h.mutex.Lock()
// Need to re-check, another thread might have created the room
// while we waited for the lock.
room, ok = h.roomTable[id]
if !ok {
room = NewRoomWorker(h, id)
h.roomTable[id] = room
h.mutex.Unlock()
go func() {
// Start room, this blocks until room expired.
room.Start()
// Cleanup room when we are done.
h.mutex.Lock()
defer h.mutex.Unlock()
delete(h.roomTable, id)
log.Printf("Cleaned up room '%s'\n", id)
}()
} else {
h.mutex.Unlock()
}
} else {
h.mutex.RUnlock()
}
return room
}
func (h *Hub) GetGlobalConnections() []*Connection {
if h.config.globalRoomid == "" {
return make([]*Connection, 0)
}
h.mutex.RLock()
if room, ok := h.roomTable[h.config.globalRoomid]; ok {
h.mutex.RUnlock()
return room.GetConnections()
}
h.mutex.RUnlock()
return make([]*Connection, 0)
}
func (h *Hub) RunForAllRooms(f func(room *RoomWorker)) {
h.mutex.RLock()
for _, room := range h.roomTable {
f(room)
}
h.mutex.RUnlock()
}
func (h *Hub) isGlobalRoomid(id string) bool {
return id != "" && (id == h.config.globalRoomid)
}
func (h *Hub) isDefaultRoomid(id string) bool {
return id == ""
}
func (h *Hub) registerHandler(c *Connection, s *Session) {
// Apply session to connection.
c.Id = s.Id
c.Session = s
h.mutex.Lock() h.mutex.Lock()
// Set flags. log.Printf("Created client with id %s", session.Id)
h.count++
c.Idx = h.count
c.IsRegistered = true
// Register connection or replace existing one. // Register connection or replace existing one.
if ec, ok := h.connectionTable[c.Id]; ok { if ec, ok := h.clients[session.Id]; ok {
ec.IsRegistered = false ec.Close(false)
ec.close()
//log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id)
} }
h.clients[session.Id] = client
h.connectionTable[c.Id] = c
h.sessionTable[c.Id] = s
//fmt.Println("registered", c.Id) //fmt.Println("registered", c.Id)
h.mutex.Unlock() h.mutex.Unlock()
//log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id)
h.server.OnRegister(c)
} }
func (h *Hub) unregisterHandler(c *Connection) { func (h *hub) OnDisconnect(session *Session) {
h.mutex.Lock() h.mutex.Lock()
if !c.IsRegistered { delete(h.clients, session.Id)
h.mutex.Unlock()
return
}
suserid := c.Session.Userid()
delete(h.connectionTable, c.Id)
delete(h.sessionTable, c.Id)
if suserid != "" {
user, ok := h.userTable[suserid]
if ok {
empty := user.RemoveSession(c.Session)
if empty {
delete(h.userTable, suserid)
}
}
}
h.mutex.Unlock() h.mutex.Unlock()
h.buddyImages.Delete(c.Id)
//log.Printf("Unregister (%d) from %s: %s\n", c.Idx, c.RemoteAddr, c.Id)
h.server.OnUnregister(c)
c.close()
} }
func (h *Hub) unicastHandler(m *MessageRequest) { func (h *hub) GetClient(id string) (client Client, ok bool) {
h.mutex.RLock() h.mutex.RLock()
out, ok := h.connectionTable[m.To] client, ok = h.clients[id]
h.mutex.RUnlock() h.mutex.RUnlock()
if !ok { return
log.Println("Unicast To not found", m.To)
return
}
out.send(m.Message)
} }
func (h *Hub) aliveHandler(c *Connection, alive *DataAlive, iid string) { func (h *hub) Unicast(session *Session, to string, m interface{}) {
outgoing := &DataOutgoing{
aliveJson := h.buffers.New() From: session.Id,
encoder := json.NewEncoder(aliveJson) To: to,
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: alive, Iid: iid}) A: session.Attestation(),
if err != nil { Data: m,
log.Println("Alive error while encoding JSON", err)
aliveJson.Decref()
return
} }
c.send(aliveJson) if message, err := h.EncodeOutgoing(outgoing); err == nil {
aliveJson.Decref() client, ok := h.GetClient(to)
}
func (h *Hub) sessionsHandler(c *Connection, srq *DataSessionsRequest, iid string) {
var users []*DataSession
switch srq.Type {
case "contact":
contact := &Contact{}
err := h.contacts.Decode("contact", srq.Token, contact)
if err != nil {
log.Println("Failed to decode incoming contact token", err, srq.Token)
return
}
// Use the userid which is not ours from the contact data.
var userid string
suserid := c.Session.Userid()
if contact.A == suserid {
userid = contact.B
} else if contact.B == suserid {
userid = contact.A
}
if userid == "" {
log.Println("Ignoring foreign contact token", contact.A, contact.B)
return
}
// Find foreign user.
h.mutex.RLock()
user, ok := h.userTable[userid]
h.mutex.RUnlock()
if !ok { if !ok {
// No user. Create fake session. log.Println("Unicast To not found", to)
users = make([]*DataSession, 1, 1)
users[0] = h.CreateFakeSession(userid).Data()
} else {
// Add sessions for forein user.
users = user.SubscribeSessions(c.Session)
}
case "session":
id, err := c.Session.attestation.Decode(srq.Token)
if err != nil {
log.Println("Failed to decode incoming attestation", err, srq.Token)
return return
} }
h.mutex.RLock() client.Send(message)
session, ok := h.sessionTable[id] message.Decref()
h.mutex.RUnlock()
if !ok {
return
}
users = make([]*DataSession, 1, 1)
users[0] = session.Data()
default:
log.Println("Unkown incoming sessions request type", srq.Type)
} }
if users != nil {
sessions := &DataSessions{Type: "Sessions", Users: users, Sessions: srq}
sessionsJson := h.buffers.New()
encoder := json.NewEncoder(sessionsJson)
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: sessions, Iid: iid})
if err != nil {
log.Println("Sessions error while encoding JSON", err)
sessionsJson.Decref()
return
}
c.send(sessionsJson)
sessionsJson.Decref()
}
} }
func (h *Hub) sessionupdateHandler(s *SessionUpdate) uint64 { func (h *hub) getContactID(session *Session, token string) (userid string, err error) {
contact := &Contact{}
//fmt.Println("Userupdate", u) err = h.contacts.Decode("contact", token, contact)
h.mutex.RLock()
session, ok := h.sessionTable[s.Id]
h.mutex.RUnlock()
var rev uint64
if ok {
if s.Status != nil {
status, ok := s.Status.(map[string]interface{})
if ok && status["buddyPicture"] != nil {
pic := status["buddyPicture"].(string)
if strings.HasPrefix(pic, "data:") {
imageId := h.buddyImages.Update(s.Id, pic[5:])
if imageId != "" {
status["buddyPicture"] = "img:" + imageId
}
}
}
}
rev = session.Update(s)
} else {
log.Printf("Update data for unknown user %s\n", s.Id)
}
return rev
}
func (h *Hub) sessiontokenHandler(st *SessionToken) (string, error) {
h.mutex.RLock()
c, ok := h.connectionTable[st.Id]
h.mutex.RUnlock()
if !ok {
return "", errors.New("no such connection")
}
nonce, err := c.Session.Authorize(h.realm, st)
if err != nil { if err != nil {
return "", err err = fmt.Errorf("Failed to decode incoming contact token", err, token)
return
} }
// Use the userid which is not ours from the contact data.
return nonce, nil suserid := session.Userid()
if contact.A == suserid {
} userid = contact.B
} else if contact.B == suserid {
func (h *Hub) authenticateHandler(session *Session, st *SessionToken, userid string) error { userid = contact.A
err := session.Authenticate(h.realm, st, userid)
if err == nil {
// Authentication success.
suserid := session.Userid()
h.mutex.Lock()
user, ok := h.userTable[suserid]
if !ok {
user = NewUser(suserid)
h.userTable[suserid] = user
}
h.mutex.Unlock()
user.AddSession(session)
} }
if userid == "" {
return err err = fmt.Errorf("Ignoring foreign contact token", contact.A, contact.B)
}
return
} }
func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactRequest) error { func (h *hub) contactrequestHandler(session *Session, to string, cr *DataContactRequest) error {
var err error var err error
@ -588,13 +230,11 @@ func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactReq
if err != nil { if err != nil {
return err return err
} }
suserid := c.Session.Userid() suserid := session.Userid()
if suserid == "" { if suserid == "" {
return errors.New("no userid") return errors.New("no userid")
} }
h.mutex.RLock() session, ok := h.GetSession(to)
session, ok := h.sessionTable[to]
h.mutex.RUnlock()
if !ok { if !ok {
return errors.New("unknown to session for confirm") return errors.New("unknown to session for confirm")
} }
@ -616,13 +256,11 @@ func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactReq
} else { } else {
// New request. // New request.
// Create Token with flag and c.Session.Userid and the to Session.Userid. // Create Token with flag and c.Session.Userid and the to Session.Userid.
suserid := c.Session.Userid() suserid := session.Userid()
if suserid == "" { if suserid == "" {
return errors.New("no userid") return errors.New("no userid")
} }
h.mutex.RLock() session, ok := h.GetSession(to)
session, ok := h.sessionTable[to]
h.mutex.RUnlock()
if !ok { if !ok {
return errors.New("unknown to session") return errors.New("unknown to session")
} }

69
src/app/spreed-webrtc-server/incoming_codec.go

@ -0,0 +1,69 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"bytes"
"encoding/json"
"log"
)
type IncomingDecoder interface {
DecodeIncoming(Buffer) (*DataIncoming, error)
}
type OutgoingEncoder interface {
EncodeOutgoing(*DataOutgoing) (Buffer, error)
}
type Codec interface {
NewBuffer() Buffer
IncomingDecoder
OutgoingEncoder
}
type incomingCodec struct {
buffers BufferCache
}
func NewCodec() Codec {
return &incomingCodec{NewBufferCache(1024, bytes.MinRead)}
}
func (codec incomingCodec) NewBuffer() Buffer {
return codec.buffers.New()
}
func (codec incomingCodec) DecodeIncoming(b Buffer) (*DataIncoming, error) {
incoming := &DataIncoming{}
return incoming, json.Unmarshal(b.Bytes(), incoming)
}
func (codec incomingCodec) EncodeOutgoing(outgoing *DataOutgoing) (Buffer, error) {
b := codec.NewBuffer()
if err := json.NewEncoder(b).Encode(outgoing); err != nil {
log.Println("Error while encoding JSON", err)
b.Decref()
return nil, err
}
return b, nil
}

29
src/app/spreed-webrtc-server/main.go

@ -91,12 +91,12 @@ func roomHandler(w http.ResponseWriter, r *http.Request) {
} }
func makeImageHandler(hub *Hub, expires time.Duration) http.HandlerFunc { func makeImageHandler(buddyImages ImageCache, expires time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
image := hub.buddyImages.Get(vars["imageid"]) image := buddyImages.Get(vars["imageid"])
if image == nil { if image == nil {
http.Error(w, "Unknown image", http.StatusNotFound) http.Error(w, "Unknown image", http.StatusNotFound)
return return
@ -223,6 +223,10 @@ func runner(runtime phoenix.Runtime) error {
} }
} }
if len(sessionSecret) < 32 {
log.Printf("Weak sessionSecret (only %d bytes). It is recommended to use a key with 32 or 64 bytes.\n", len(sessionSecret))
}
var encryptionSecret []byte var encryptionSecret []byte
encryptionSecretString, err := runtime.GetString("app", "encryptionSecret") encryptionSecretString, err := runtime.GetString("app", "encryptionSecret")
if err != nil { if err != nil {
@ -371,9 +375,6 @@ func runner(runtime phoenix.Runtime) error {
// Create realm string from config. // Create realm string from config.
computedRealm := fmt.Sprintf("%s.%s", serverRealm, serverToken) computedRealm := fmt.Sprintf("%s.%s", serverRealm, serverToken)
// Create our hub instance.
hub := NewHub(runtimeVersion, config, sessionSecret, encryptionSecret, turnSecret, computedRealm)
// Set number of go routines if it is 1 // Set number of go routines if it is 1
if goruntime.GOMAXPROCS(0) == 1 { if goruntime.GOMAXPROCS(0) == 1 {
nCPU := goruntime.NumCPU() nCPU := goruntime.NumCPU()
@ -426,12 +427,20 @@ func runner(runtime phoenix.Runtime) error {
} }
// Add handlers. // Add handlers.
buddyImages := NewImageCache()
codec := NewCodec()
roomManager := NewRoomManager(config, codec)
hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec)
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager)
channellingAPI := NewChannellingAPI(runtimeVersion, config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, roomManager, buddyImages)
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(basePath, makeImageHandler(hub, time.Duration(24)*time.Hour))) r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(basePath, makeImageHandler(buddyImages, time.Duration(24)*time.Hour)))
r.Handle("/static/{path:.*}", http.StripPrefix(basePath, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/static/{path:.*}", http.StripPrefix(basePath, httputils.FileStaticServer(http.Dir(rootFolder))))
r.Handle("/robots.txt", http.StripPrefix(basePath, http.FileServer(http.Dir(path.Join(rootFolder, "static"))))) r.Handle("/robots.txt", http.StripPrefix(basePath, http.FileServer(http.Dir(path.Join(rootFolder, "static")))))
r.Handle("/favicon.ico", http.StripPrefix(basePath, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img"))))) r.Handle("/favicon.ico", http.StripPrefix(basePath, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img")))))
r.Handle("/ws", makeWsHubHandler(hub)) r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI))
r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler)) r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
// Add API end points. // Add API end points.
@ -442,14 +451,14 @@ func runner(runtime phoenix.Runtime) error {
api.AddResourceWithWrapper(&Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens") api.AddResourceWithWrapper(&Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens")
if usersEnabled { if usersEnabled {
// Create Users handler. // Create Users handler.
users := NewUsers(hub, usersMode, serverRealm, runtime) users := NewUsers(hub, tickets, sessionManager, usersMode, serverRealm, runtime)
api.AddResource(&Sessions{hub: hub, users: users}, "/sessions/{id}/") api.AddResource(&Sessions{tickets, hub, users}, "/sessions/{id}/")
if usersAllowRegistration { if usersAllowRegistration {
api.AddResource(users, "/users") api.AddResource(users, "/users")
} }
} }
if statsEnabled { if statsEnabled {
api.AddResourceWithWrapper(&Stats{hub: hub}, httputils.MakeGzipHandler, "/stats") api.AddResourceWithWrapper(&Stats{statsManager}, httputils.MakeGzipHandler, "/stats")
log.Println("Stats are enabled!") log.Println("Stats are enabled!")
} }

170
src/app/spreed-webrtc-server/room_manager.go

@ -0,0 +1,170 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"log"
"sync"
)
type RoomStatusManager interface {
CanJoinRoom(id string) bool
RoomUsers(*Session) []*DataSession
JoinRoom(*Session, Sender)
LeaveRoom(*Session)
}
type Broadcaster interface {
Broadcast(*Session, interface{})
}
type RoomStats interface {
RoomInfo(includeSessions bool) (count int, sessionInfo map[string][]string)
}
type RoomManager interface {
RoomStatusManager
Broadcaster
RoomStats
}
type roomManager struct {
sync.RWMutex
OutgoingEncoder
defaultRoomEnabled bool
globalRoomID string
roomTable map[string]RoomWorker
}
func NewRoomManager(config *Config, encoder OutgoingEncoder) RoomManager {
return &roomManager{
sync.RWMutex{},
encoder,
config.DefaultRoomEnabled,
config.globalRoomid,
make(map[string]RoomWorker),
}
}
func (rooms *roomManager) CanJoinRoom(id string) bool {
return id != "" || rooms.defaultRoomEnabled
}
func (rooms *roomManager) RoomUsers(session *Session) []*DataSession {
return <-rooms.getRoomWorker(session.Roomid).GetUsers()
}
func (rooms *roomManager) JoinRoom(session *Session, sender Sender) {
rooms.getRoomWorker(session.Roomid).Join(session, sender)
}
func (rooms *roomManager) LeaveRoom(session *Session) {
rooms.getRoomWorker(session.Roomid).Leave(session)
}
func (rooms *roomManager) Broadcast(session *Session, m interface{}) {
outgoing := &DataOutgoing{
From: session.Id,
A: session.Attestation(),
Data: m,
}
message, err := rooms.EncodeOutgoing(outgoing)
if err != nil {
return
}
id := session.Roomid
if id != "" && id == rooms.globalRoomID {
rooms.RLock()
for _, room := range rooms.roomTable {
room.Broadcast(session, message)
}
rooms.RUnlock()
} else {
room := rooms.getRoomWorker(id)
room.Broadcast(session, message)
}
message.Decref()
}
func (rooms *roomManager) RoomInfo(includeSessions bool) (count int, sessionInfo map[string][]string) {
rooms.RLock()
defer rooms.RUnlock()
count = len(rooms.roomTable)
if includeSessions {
sessionInfo := make(map[string][]string)
for roomid, room := range rooms.roomTable {
sessionInfo[roomid] = room.SessionIDs()
}
}
return
}
func (rooms *roomManager) getRoomWorker(id string) RoomWorker {
rooms.RLock()
room, ok := rooms.roomTable[id]
if !ok {
rooms.RUnlock()
rooms.Lock()
// Need to re-check, another thread might have created the room
// while we waited for the lock.
room, ok = rooms.roomTable[id]
if !ok {
room = NewRoomWorker(rooms, id)
rooms.roomTable[id] = room
rooms.Unlock()
go func() {
// Start room, this blocks until room expired.
room.Start()
// Cleanup room when we are done.
rooms.Lock()
defer rooms.Unlock()
delete(rooms.roomTable, id)
log.Printf("Cleaned up room '%s'\n", id)
}()
} else {
rooms.Unlock()
}
} else {
rooms.RUnlock()
}
return room
}
func (rooms *roomManager) GlobalUsers() []*roomUser {
if rooms.globalRoomID == "" {
return make([]*roomUser, 0)
}
rooms.RLock()
if room, ok := rooms.roomTable[rooms.globalRoomID]; ok {
rooms.RUnlock()
return room.Users()
}
rooms.RUnlock()
return make([]*roomUser, 0)
}

136
src/app/spreed-webrtc-server/roomworker.go

@ -22,7 +22,6 @@
package main package main
import ( import (
"encoding/json"
"log" "log"
"sync" "sync"
"time" "time"
@ -33,39 +32,47 @@ const (
roomExpiryDuration = 60 * time.Second roomExpiryDuration = 60 * time.Second
) )
type RoomConnectionUpdate struct { type RoomWorker interface {
Id string Start()
Sessionid string SessionIDs() []string
Status bool Users() []*roomUser
Connection *Connection GetUsers() <-chan []*DataSession
Broadcast(*Session, Buffer)
Join(*Session, Sender)
Leave(*Session)
} }
type RoomWorker struct { type roomWorker struct {
// References. // References.
h *Hub manager *roomManager
// Data handling. // Data handling.
workers chan (func()) workers chan (func())
expired chan (bool) expired chan (bool)
connections map[string]*Connection users map[string]*roomUser
timer *time.Timer timer *time.Timer
mutex sync.RWMutex mutex sync.RWMutex
// Metadata. // Metadata.
Id string Id string
} }
func NewRoomWorker(h *Hub, id string) *RoomWorker { type roomUser struct {
*Session
Sender
}
func NewRoomWorker(manager *roomManager, id string) RoomWorker {
log.Printf("Creating worker for room '%s'\n", id) log.Printf("Creating worker for room '%s'\n", id)
r := &RoomWorker{ r := &roomWorker{
h: h, manager: manager,
Id: id, Id: id,
workers: make(chan func(), roomMaxWorkers),
expired: make(chan bool),
users: make(map[string]*roomUser),
} }
r.workers = make(chan func(), roomMaxWorkers)
r.expired = make(chan bool)
r.connections = make(map[string]*Connection)
// Create expire timer. // Create expire timer.
r.timer = time.AfterFunc(roomExpiryDuration, func() { r.timer = time.AfterFunc(roomExpiryDuration, func() {
@ -76,7 +83,7 @@ func NewRoomWorker(h *Hub, id string) *RoomWorker {
} }
func (r *RoomWorker) Start() { func (r *roomWorker) Start() {
// Main blocking worker. // Main blocking worker.
L: L:
@ -90,7 +97,7 @@ L:
//fmt.Println("Work room expired", r.Id) //fmt.Println("Work room expired", r.Id)
//fmt.Println("Work room expired", r.Id, len(r.connections)) //fmt.Println("Work room expired", r.Id, len(r.connections))
r.mutex.RLock() r.mutex.RLock()
if len(r.connections) == 0 { if len(r.users) == 0 {
// Cleanup room when it is empty. // Cleanup room when it is empty.
r.mutex.RUnlock() r.mutex.RUnlock()
log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id) log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id)
@ -107,19 +114,29 @@ L:
} }
func (r *RoomWorker) GetConnections() []*Connection { func (r *roomWorker) SessionIDs() []string {
r.mutex.RLock()
defer r.mutex.RUnlock()
sessions := make([]string, 0, len(r.users))
for id := range r.users {
sessions = append(sessions, id)
}
return sessions
}
func (r *roomWorker) Users() []*roomUser {
r.mutex.RLock() r.mutex.RLock()
defer r.mutex.RUnlock() defer r.mutex.RUnlock()
connections := make([]*Connection, 0, len(r.connections)) users := make([]*roomUser, 0, len(r.users))
for _, connection := range r.connections { for _, user := range r.users {
connections = append(connections, connection) users = append(users, user)
} }
return connections return users
} }
func (r *RoomWorker) Run(f func()) bool { func (r *roomWorker) Run(f func()) bool {
select { select {
case r.workers <- f: case r.workers <- f:
@ -131,13 +148,12 @@ func (r *RoomWorker) Run(f func()) bool {
} }
func (r *RoomWorker) usersHandler(c *Connection) { func (r *roomWorker) GetUsers() <-chan []*DataSession {
out := make(chan []*DataSession, 1)
worker := func() { worker := func() {
sessions := &DataSessions{Type: "Users"}
var sl []*DataSession var sl []*DataSession
appender := func(ec *Connection) bool { appender := func(user *roomUser) bool {
ecsession := ec.Session ecsession := user.Session
if ecsession != nil { if ecsession != nil {
session := ecsession.Data() session := ecsession.Data()
session.Type = "Online" session.Type = "Online"
@ -150,73 +166,65 @@ func (r *RoomWorker) usersHandler(c *Connection) {
return true return true
} }
r.mutex.RLock() r.mutex.RLock()
sl = make([]*DataSession, 0, len(r.connections)) sl = make([]*DataSession, 0, len(r.users))
// Include connections in this room. // Include connections in this room.
for _, ec := range r.connections { for _, user := range r.users {
if !appender(ec) { if !appender(user) {
break break
} }
} }
r.mutex.RUnlock() r.mutex.RUnlock()
// Include connections to global room. // Include connections to global room.
for _, ec := range c.h.GetGlobalConnections() { for _, ec := range r.manager.GlobalUsers() {
if !appender(ec) { if !appender(ec) {
break break
} }
} }
sessions.Users = sl
sessionsJson := c.h.buffers.New()
encoder := json.NewEncoder(sessionsJson)
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: sessions})
if err != nil {
log.Println("Users error while encoding JSON", err)
sessionsJson.Decref()
return
}
c.send(sessionsJson)
sessionsJson.Decref()
out <- sl
} }
r.Run(worker) r.Run(worker)
return out
} }
func (r *RoomWorker) broadcastHandler(m *MessageRequest) { func (r *roomWorker) Broadcast(session *Session, message Buffer) {
worker := func() { worker := func() {
r.mutex.RLock() r.mutex.RLock()
defer r.mutex.RUnlock() defer r.mutex.RUnlock()
for id, ec := range r.connections { for id, user := range r.users {
if id == m.From { if id == session.Id {
// Skip broadcast to self. // Skip broadcast to self.
continue continue
} }
//fmt.Printf("%s\n", m.Message) //fmt.Printf("%s\n", m.Message)
ec.send(m.Message) user.Send(message)
} }
m.Message.Decref() message.Decref()
} }
m.Message.Incref() message.Incref()
r.Run(worker) r.Run(worker)
} }
func (r *RoomWorker) connectionHandler(rcu *RoomConnectionUpdate) { func (r *roomWorker) Join(session *Session, sender Sender) {
worker := func() {
r.mutex.Lock()
defer r.mutex.Unlock()
r.users[session.Id] = &roomUser{session, sender}
}
r.Run(worker)
}
func (r *roomWorker) Leave(session *Session) {
worker := func() { worker := func() {
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
if rcu.Status { if _, ok := r.users[session.Id]; ok {
r.connections[rcu.Sessionid] = rcu.Connection delete(r.users, session.Id)
} else {
if _, ok := r.connections[rcu.Sessionid]; ok {
delete(r.connections, rcu.Sessionid)
}
} }
} }
r.Run(worker) r.Run(worker)
} }

280
src/app/spreed-webrtc-server/server.go

@ -1,280 +0,0 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"encoding/json"
"log"
"sync/atomic"
"time"
)
const (
maxConferenceSize = 100
)
type Server struct {
}
func (s *Server) OnRegister(c *Connection) {
//log.Println("OnRegister", c.id)
if token, err := c.h.EncodeSessionToken(c.Session.Token()); err == nil {
log.Println("Created new session token", len(token), token)
// Send stuff back.
s.Unicast(c, c.Id, &DataSelf{
Type: "Self",
Id: c.Id,
Sid: c.Session.Sid,
Userid: c.Session.Userid(),
Suserid: c.h.CreateSuserid(c.Session),
Token: token,
Version: c.h.version,
Turn: c.h.CreateTurnData(c.Id),
Stun: c.h.config.StunURIs,
})
} else {
log.Println("Error in OnRegister", c.Idx, err)
}
}
func (s *Server) OnUnregister(c *Connection) {
//log.Println("OnUnregister", c.id)
dsl := c.Session.DataSessionLeft("hard")
if c.Hello {
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid})
s.Broadcast(c, dsl)
}
c.Session.RunForAllSubscribers(func(session *Session) {
log.Println("Notifying subscriber that we are gone", c.Id, session.Id)
s.Unicast(c, session.Id, dsl)
})
}
func (s *Server) OnText(c *Connection, b Buffer) {
//log.Printf("OnText from %d: %s\n", c.Id, b)
var msg DataIncoming
err := json.Unmarshal(b.Bytes(), &msg)
if err != nil {
log.Println("OnText error while decoding JSON", err)
log.Printf("JSON:\n%s\n", b)
return
}
switch msg.Type {
case "Self":
s.OnRegister(c)
case "Hello":
//log.Println("Hello", msg.Hello, c.Idx)
// TODO(longsleep): Filter room id and user agent.
s.UpdateSession(c, &SessionUpdate{Types: []string{"Roomid", "Ua"}, Roomid: msg.Hello.Id, Ua: msg.Hello.Ua})
if c.Hello && c.Roomid != msg.Hello.Id {
// Room changed.
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid})
s.Broadcast(c, c.Session.DataSessionLeft("soft"))
}
c.Roomid = msg.Hello.Id
if c.h.config.DefaultRoomEnabled || !c.h.isDefaultRoomid(c.Roomid) {
c.Hello = true
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid, Status: true})
s.Broadcast(c, c.Session.DataSessionJoined())
} else {
c.Hello = false
}
case "Offer":
// TODO(longsleep): Validate offer
s.Unicast(c, msg.Offer.To, msg.Offer)
case "Candidate":
// TODO(longsleep): Validate candidate
s.Unicast(c, msg.Candidate.To, msg.Candidate)
case "Answer":
// TODO(longsleep): Validate Answer
s.Unicast(c, msg.Answer.To, msg.Answer)
case "Users":
if c.Hello {
s.Users(c)
}
case "Authentication":
if msg.Authentication.Authentication != nil && s.Authenticate(c, msg.Authentication.Authentication) {
s.OnRegister(c)
if c.Hello {
s.Broadcast(c, c.Session.DataSessionStatus())
}
}
case "Bye":
s.Unicast(c, msg.Bye.To, msg.Bye)
case "Status":
//log.Println("Status", msg.Status)
s.UpdateSession(c, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status})
if c.Hello {
s.Broadcast(c, c.Session.DataSessionStatus())
}
case "Chat":
// TODO(longsleep): Limit sent chat messages per incoming connection.
if !msg.Chat.Chat.NoEcho {
s.Unicast(c, c.Id, msg.Chat)
}
msg.Chat.Chat.Time = time.Now().Format(time.RFC3339)
if msg.Chat.To == "" {
// TODO(longsleep): Check if chat broadcast is allowed.
if c.Hello {
atomic.AddUint64(&c.h.broadcastChatMessages, 1)
s.Broadcast(c, msg.Chat)
}
} else {
if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil {
err = s.ContactRequest(c, msg.Chat.To, msg.Chat.Chat.Status.ContactRequest)
if err != nil {
log.Println("Ignoring invalid contact request.", err)
return
}
msg.Chat.Chat.Status.ContactRequest.Userid = c.Session.Userid()
}
atomic.AddUint64(&c.h.unicastChatMessages, 1)
s.Unicast(c, msg.Chat.To, msg.Chat)
if msg.Chat.Chat.Mid != "" {
// Send out delivery confirmation status chat message.
s.Unicast(c, c.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}})
}
}
case "Conference":
// Check conference maximum size.
if len(msg.Conference.Conference) > maxConferenceSize {
log.Println("Refusing to create conference above limit.", len(msg.Conference.Conference))
} else {
// Send conference update to anyone.
for _, id := range msg.Conference.Conference {
if id != c.Id {
//log.Println("participant", id)
s.Unicast(c, id, msg.Conference)
}
}
}
case "Alive":
s.Alive(c, msg.Alive, msg.Iid)
case "Sessions":
s.Sessions(c, msg.Sessions.Sessions, msg.Iid)
default:
log.Println("OnText unhandled message type", msg.Type)
}
}
func (s *Server) Unicast(c *Connection, to string, m interface{}) {
outgoing := &DataOutgoing{From: c.Id, To: to, Data: m}
if !c.isClosing && c.Id != to {
outgoing.A = c.Session.Attestation()
}
b := c.h.buffers.New()
encoder := json.NewEncoder(b)
err := encoder.Encode(outgoing)
if err != nil {
b.Decref()
log.Println("Unicast error while encoding JSON", err)
return
}
//log.Println("Unicast", b)
var msg = &MessageRequest{From: c.Id, To: to, Message: b}
c.h.unicastHandler(msg)
b.Decref()
}
func (s *Server) Broadcast(c *Connection, m interface{}) {
b := c.h.buffers.New()
encoder := json.NewEncoder(b)
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: m, A: c.Session.Attestation()})
if err != nil {
b.Decref()
log.Println("Broadcast error while encoding JSON", err)
return
}
if c.h.isGlobalRoomid(c.Roomid) {
c.h.RunForAllRooms(func(room *RoomWorker) {
var msg = &MessageRequest{From: c.Id, Message: b, Id: room.Id}
room.broadcastHandler(msg)
})
} else {
var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid}
room := c.h.GetRoom(c.Roomid)
room.broadcastHandler(msg)
}
b.Decref()
}
func (s *Server) Alive(c *Connection, alive *DataAlive, iid string) {
c.h.aliveHandler(c, alive, iid)
}
func (s *Server) Sessions(c *Connection, srq *DataSessionsRequest, iid string) {
c.h.sessionsHandler(c, srq, iid)
}
func (s *Server) UpdateSession(c *Connection, su *SessionUpdate) uint64 {
su.Id = c.Id
return c.h.sessionupdateHandler(su)
}
func (s *Server) ContactRequest(c *Connection, to string, cr *DataContactRequest) (err error) {
return c.h.contactrequestHandler(c, to, cr)
}
func (s *Server) Users(c *Connection) {
room := c.h.GetRoom(c.Roomid)
room.usersHandler(c)
}
func (s *Server) Authenticate(c *Connection, st *SessionToken) bool {
err := c.h.authenticateHandler(c.Session, st, "")
if err == nil {
log.Println("Authentication success", c.Id, c.Idx, c.Session.Userid)
return true
} else {
log.Println("Authentication failed", err, c.Id, c.Idx, st.Userid, st.Nonce)
return false
}
}
func (s *Server) UpdateRoomConnection(c *Connection, rcu *RoomConnectionUpdate) {
rcu.Sessionid = c.Id
rcu.Connection = c
room := c.h.GetRoom(c.Roomid)
room.connectionHandler(rcu)
}

28
src/app/spreed-webrtc-server/session.go

@ -39,26 +39,28 @@ type Session struct {
Status interface{} Status interface{}
Nonce string Nonce string
Prio int Prio int
Hello bool
Roomid string
mutex sync.RWMutex mutex sync.RWMutex
userid string userid string
fake bool fake bool
stamp int64 stamp int64
attestation *SessionAttestation attestation *SessionAttestation
attestations *securecookie.SecureCookie
subscriptions map[string]*Session subscriptions map[string]*Session
subscribers map[string]*Session subscribers map[string]*Session
h *Hub
} }
func NewSession(h *Hub, id, sid string) *Session { func NewSession(attestations *securecookie.SecureCookie, id, sid string) *Session {
session := &Session{ session := &Session{
Id: id, Id: id,
Sid: sid, Sid: sid,
Prio: 100, Prio: 100,
stamp: time.Now().Unix(), stamp: time.Now().Unix(),
attestations: attestations,
subscriptions: make(map[string]*Session), subscriptions: make(map[string]*Session),
subscribers: make(map[string]*Session), subscribers: make(map[string]*Session),
h: h,
} }
session.NewAttestation() session.NewAttestation()
return session return session
@ -288,35 +290,27 @@ func (s *Session) DataSessionStatus() *DataSession {
} }
func (s *Session) NewAttestation() { func (s *Session) NewAttestation() {
s.attestation = &SessionAttestation{ s.attestation = &SessionAttestation{
s: s, s: s,
} }
s.attestation.Update() s.attestation.Update()
} }
func (s *Session) Attestation() (attestation string) { func (s *Session) Attestation() (attestation string) {
s.mutex.RLock() s.mutex.RLock()
attestation = s.attestation.Token() attestation = s.attestation.Token()
s.mutex.RUnlock() s.mutex.RUnlock()
return return
} }
func (s *Session) UpdateAttestation() { func (s *Session) UpdateAttestation() {
s.mutex.Lock() s.mutex.Lock()
s.attestation.Update() s.attestation.Update()
s.mutex.Unlock() s.mutex.Unlock()
} }
type SessionUpdate struct { type SessionUpdate struct {
Id string
Types []string Types []string
Roomid string
Ua string Ua string
Prio int Prio int
Status interface{} Status interface{}
@ -336,39 +330,31 @@ type SessionAttestation struct {
} }
func (sa *SessionAttestation) Update() (string, error) { func (sa *SessionAttestation) Update() (string, error) {
token, err := sa.Encode() token, err := sa.Encode()
if err == nil { if err == nil {
sa.token = token sa.token = token
sa.refresh = time.Now().Unix() + 180 // expires after 3 minutes sa.refresh = time.Now().Unix() + 180 // expires after 3 minutes
} }
return token, err return token, err
} }
func (sa *SessionAttestation) Token() (token string) { func (sa *SessionAttestation) Token() (token string) {
if sa.refresh < time.Now().Unix() { if sa.refresh < time.Now().Unix() {
token, _ = sa.Update() token, _ = sa.Update()
} else { } else {
token = sa.token token = sa.token
} }
return return
} }
func (sa *SessionAttestation) Encode() (string, error) { func (sa *SessionAttestation) Encode() (string, error) {
return sa.s.attestations.Encode("attestation", sa.s.Id)
return sa.s.h.attestations.Encode("attestation", sa.s.Id)
} }
func (sa *SessionAttestation) Decode(token string) (string, error) { func (sa *SessionAttestation) Decode(token string) (string, error) {
var id string var id string
err := sa.s.h.attestations.Decode("attestation", token, &id) err := sa.s.attestations.Decode("attestation", token, &id)
return id, err return id, err
} }
func init() { func init() {

173
src/app/spreed-webrtc-server/session_manager.go

@ -0,0 +1,173 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"crypto/sha256"
"net/http"
"sync"
"github.com/gorilla/securecookie"
)
type UserStats interface {
UserInfo(bool) (int, map[string]*DataUser)
}
type SessionManager interface {
UserStats
RetrieveUsersWith(func(*http.Request) (string, error))
CreateSession(*http.Request) *Session
DestroySession(*Session)
Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession
}
type sessionManager struct {
Tickets
sync.RWMutex
config *Config
userTable map[string]*User
fakesessionTable map[string]*Session
useridRetriever func(*http.Request) (string, error)
attestations *securecookie.SecureCookie
}
func NewSessionManager(config *Config, tickets Tickets, sessionSecret []byte) SessionManager {
sessionManager := &sessionManager{
tickets,
sync.RWMutex{},
config,
make(map[string]*User),
make(map[string]*Session),
nil,
nil,
}
sessionManager.attestations = securecookie.New(sessionSecret, nil)
sessionManager.attestations.MaxAge(300) // 5 minutes
sessionManager.attestations.HashFunc(sha256.New)
return sessionManager
}
func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, users map[string]*DataUser) {
sessionManager.RLock()
defer sessionManager.RUnlock()
userCount = len(sessionManager.userTable)
if details {
users := make(map[string]*DataUser)
for userid, user := range sessionManager.userTable {
users[userid] = user.Data()
}
}
return
}
func (sessionManager *sessionManager) RetrieveUsersWith(retriever func(*http.Request) (string, error)) {
sessionManager.useridRetriever = retriever
}
func (sessionManager *sessionManager) CreateSession(request *http.Request) *Session {
request.ParseForm()
token := request.FormValue("t")
st := sessionManager.DecodeSessionToken(token)
var userid string
if sessionManager.config.UsersEnabled {
if sessionManager.useridRetriever != nil {
userid, _ = sessionManager.useridRetriever(request)
if userid == "" {
userid = st.Userid
}
}
}
session := NewSession(sessionManager.attestations, st.Id, st.Sid)
if userid != "" {
// XXX(lcooper): Should errors be handled here?
sessionManager.Authenticate(session, st, userid)
}
return session
}
func (sessionManager *sessionManager) DestroySession(session *Session) {
session.Close()
sessionManager.Lock()
if suserid := session.Userid(); suserid != "" {
user, ok := sessionManager.userTable[suserid]
if ok && user.RemoveSession(session) {
delete(sessionManager.userTable, suserid)
}
}
sessionManager.Unlock()
}
func (sessionManager *sessionManager) Authenticate(session *Session, st *SessionToken, userid string) error {
if err := session.Authenticate(sessionManager.Realm(), st, userid); err != nil {
return err
}
// Authentication success.
suserid := session.Userid()
sessionManager.Lock()
user, ok := sessionManager.userTable[suserid]
if !ok {
user = NewUser(suserid)
sessionManager.userTable[suserid] = user
}
sessionManager.Unlock()
user.AddSession(session)
return nil
}
func (sessionManager *sessionManager) GetUserSessions(session *Session, userid string) (users []*DataSession) {
var (
user *User
ok bool
)
sessionManager.RLock()
user, ok = sessionManager.userTable[userid]
sessionManager.RUnlock()
if !ok {
// No user. Create fake session.
sessionManager.Lock()
session, ok := sessionManager.fakesessionTable[userid]
if !ok {
st := sessionManager.FakeSessionToken(userid)
session = NewSession(sessionManager.attestations, st.Id, st.Sid)
session.SetUseridFake(st.Userid)
sessionManager.fakesessionTable[userid] = session
}
sessionManager.Unlock()
users = make([]*DataSession, 1, 1)
users[0] = session.Data()
} else {
// Add sessions for foreign user.
users = user.SubscribeSessions(session)
}
return
}

13
src/app/spreed-webrtc-server/sessions.go

@ -23,6 +23,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"errors"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"log" "log"
"net/http" "net/http"
@ -42,7 +43,8 @@ type SessionNonceRequest struct {
} }
type Sessions struct { type Sessions struct {
hub *Hub SessionValidator
SessionStore
users *Users users *Users
} }
@ -78,7 +80,7 @@ func (sessions *Sessions) Patch(request *http.Request) (int, interface{}, http.H
} }
// Make sure Sid matches session and is valid. // Make sure Sid matches session and is valid.
if !sessions.hub.ValidateSession(snr.Id, snr.Sid) { if !sessions.ValidateSession(snr.Id, snr.Sid) {
log.Println("Session patch failed - validation failed.") log.Println("Session patch failed - validation failed.")
error = true error = true
} }
@ -104,7 +106,12 @@ func (sessions *Sessions) Patch(request *http.Request) (int, interface{}, http.H
var nonce string var nonce string
if !error { if !error {
// FIXME(longsleep): Not running this might reveal error state with a timing attack. // FIXME(longsleep): Not running this might reveal error state with a timing attack.
nonce, err = sessions.hub.sessiontokenHandler(&SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) if session, ok := sessions.GetSession(snr.Id); ok {
nonce, err = session.Authorize(sessions.Realm(), &SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else {
err = errors.New("no such session")
}
if err != nil { if err != nil {
log.Println("Session patch failed - handle failed.", err) log.Println("Session patch failed - handle failed.", err)
error = true error = true

8
src/app/spreed-webrtc-server/stats.go

@ -33,11 +33,11 @@ type Stat struct {
Hub *HubStat `json:"hub"` Hub *HubStat `json:"hub"`
} }
func NewStat(details bool, h *Hub) *Stat { func NewStat(details bool, statsGenerator StatsGenerator) *Stat {
stat := &Stat{ stat := &Stat{
details: details, details: details,
Runtime: &RuntimeStat{}, Runtime: &RuntimeStat{},
Hub: h.Stat(details), Hub: statsGenerator.Stat(details),
} }
stat.Runtime.Read() stat.Runtime.Read()
return stat return stat
@ -69,12 +69,12 @@ func (stat *RuntimeStat) Read() {
} }
type Stats struct { type Stats struct {
hub *Hub StatsGenerator
} }
func (stats *Stats) Get(request *http.Request) (int, interface{}, http.Header) { func (stats *Stats) Get(request *http.Request) (int, interface{}, http.Header) {
details := request.Form.Get("details") == "1" details := request.Form.Get("details") == "1"
return 200, NewStat(details, stats.hub), http.Header{"Content-Type": {"application/json; charset=utf-8"}, "Access-Control-Allow-Origin": {"*"}} return 200, NewStat(details, stats), http.Header{"Content-Type": {"application/json; charset=utf-8"}, "Access-Control-Allow-Origin": {"*"}}
} }

104
src/app/spreed-webrtc-server/stats_manager.go

@ -0,0 +1,104 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"sync/atomic"
)
type HubStat struct {
Rooms int `json:"rooms"`
Connections int `json:"connections"`
Sessions int `json:"sessions"`
Users int `json:"users"`
Count uint64 `json:"count"`
BroadcastChatMessages uint64 `json:"broadcastchatmessages"`
UnicastChatMessages uint64 `json:"unicastchatmessages"`
IdsInRoom map[string][]string `json:"idsinroom,omitempty"`
SessionsById map[string]*DataSession `json:"sessionsbyid,omitempty"`
UsersById map[string]*DataUser `json:"usersbyid,omitempty"`
ConnectionsByIdx map[string]string `json:"connectionsbyidx,omitempty"`
}
type ConnectionCounter interface {
CountConnection() uint64
}
type StatsCounter interface {
CountBroadcastChat()
CountUnicastChat()
}
type StatsGenerator interface {
Stat(details bool) *HubStat
}
type StatsManager interface {
ConnectionCounter
StatsCounter
StatsGenerator
}
type statsManager struct {
ClientStats
RoomStats
UserStats
connectionCount uint64
broadcastChatMessages uint64
unicastChatMessages uint64
}
func NewStatsManager(clientStats ClientStats, roomStats RoomStats, userStats UserStats) StatsManager {
return &statsManager{clientStats, roomStats, userStats, 0, 0, 0}
}
func (stats *statsManager) CountConnection() uint64 {
return atomic.AddUint64(&stats.connectionCount, 1)
}
func (stats *statsManager) CountBroadcastChat() {
atomic.AddUint64(&stats.broadcastChatMessages, 1)
}
func (stats *statsManager) CountUnicastChat() {
atomic.AddUint64(&stats.unicastChatMessages, 1)
}
func (stats *statsManager) Stat(details bool) *HubStat {
roomCount, roomSessionInfo := stats.RoomInfo(details)
clientCount, sessions, connections := stats.ClientInfo(details)
userCount, users := stats.UserInfo(details)
return &HubStat{
Rooms: roomCount,
Connections: clientCount,
Sessions: clientCount,
Users: userCount,
Count: atomic.LoadUint64(&stats.connectionCount),
BroadcastChatMessages: atomic.LoadUint64(&stats.broadcastChatMessages),
UnicastChatMessages: atomic.LoadUint64(&stats.unicastChatMessages),
IdsInRoom: roomSessionInfo,
SessionsById: sessions,
UsersById: users,
ConnectionsByIdx: connections,
}
}

130
src/app/spreed-webrtc-server/tickets.go

@ -0,0 +1,130 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"crypto/aes"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"fmt"
"log"
"github.com/gorilla/securecookie"
)
type SessionValidator interface {
Realm() string
ValidateSession(string, string) bool
}
type SessionEncoder interface {
EncodeSessionToken(*Session) (string, error)
EncodeSessionUserID(*Session) string
}
type Tickets interface {
SessionValidator
SessionEncoder
DecodeSessionToken(token string) (st *SessionToken)
FakeSessionToken(userid string) *SessionToken
}
type tickets struct {
*securecookie.SecureCookie
realm string
tokenName string
encryptionSecret []byte
}
func NewTickets(sessionSecret, encryptionSecret []byte, realm string) Tickets {
tickets := &tickets{
nil,
realm,
fmt.Sprintf("token@%s", realm),
encryptionSecret,
}
tickets.SecureCookie = securecookie.New(sessionSecret, encryptionSecret)
tickets.MaxAge(86400 * 30) // 30 days
tickets.HashFunc(sha256.New)
tickets.BlockFunc(aes.NewCipher)
return tickets
}
func (tickets *tickets) Realm() string {
return tickets.realm
}
func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) {
var err error
if token != "" {
st = &SessionToken{}
err = tickets.Decode(tickets.tokenName, token, st)
if err != nil {
log.Println("Error while decoding session token", err)
}
}
if st == nil || err != nil {
sid := NewRandomString(32)
id, _ := tickets.Encode("id", sid)
st = &SessionToken{Id: id, Sid: sid}
log.Println("Created new session id", id)
}
return
}
func (tickets *tickets) FakeSessionToken(userid string) *SessionToken {
st := &SessionToken{}
st.Sid = fmt.Sprintf("fake-%s", NewRandomString(27))
st.Id, _ = tickets.Encode("id", st.Sid)
st.Userid = userid
log.Println("Created new fake session id", st.Id)
return st
}
func (tickets *tickets) ValidateSession(id, sid string) bool {
var decoded string
if err := tickets.Decode("id", id, &decoded); err != nil {
log.Println("Session validation error", err, id, sid)
return false
}
if decoded != sid {
log.Println("Session validation failed", id, sid)
return false
}
return true
}
func (tickets *tickets) EncodeSessionToken(session *Session) (string, error) {
return tickets.Encode(tickets.tokenName, session.Token())
}
func (tickets *tickets) EncodeSessionUserID(session *Session) (suserid string) {
if userid := session.Userid(); userid != "" {
m := hmac.New(sha256.New, tickets.encryptionSecret)
m.Write([]byte(userid))
suserid = base64.StdEncoding.EncodeToString(m.Sum(nil))
}
return
}

32
src/app/spreed-webrtc-server/users.go

@ -291,16 +291,21 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) {
} }
type Users struct { type Users struct {
hub *Hub SessionValidator
SessionManager
SessionStore
realm string realm string
handler UsersHandler handler UsersHandler
} }
func NewUsers(hub *Hub, mode, realm string, runtime phoenix.Runtime) *Users { func NewUsers(sessionStore SessionStore, sessionValidator SessionValidator, sessionManager SessionManager, mode, realm string, runtime phoenix.Runtime) *Users {
var users = &Users{ var users = &Users{
hub: hub, sessionValidator,
realm: realm, sessionManager,
sessionStore,
realm,
nil,
} }
var handler UsersHandler var handler UsersHandler
@ -309,8 +314,8 @@ func NewUsers(hub *Hub, mode, realm string, runtime phoenix.Runtime) *Users {
// Create handler based on mode. // Create handler based on mode.
if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil { if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil {
users.handler = handler users.handler = handler
// Register handler Get at the hub. // Register handler Get.
users.hub.useridRetriever = func(request *http.Request) (userid string, err error) { sessionManager.RetrieveUsersWith(func(request *http.Request) (userid string, err error) {
userid, err = handler.Get(request) userid, err = handler.Get(request)
if err != nil { if err != nil {
log.Printf("Failed to get userid from handler: %s", err) log.Printf("Failed to get userid from handler: %s", err)
@ -320,7 +325,7 @@ func NewUsers(hub *Hub, mode, realm string, runtime phoenix.Runtime) *Users {
} }
} }
return return
} })
log.Printf("Enabled users handler '%s'\n", mode) log.Printf("Enabled users handler '%s'\n", mode)
} else if err != nil { } else if err != nil {
log.Printf("Failed to enable handler '%s': %s\n", mode, err) log.Printf("Failed to enable handler '%s': %s\n", mode, err)
@ -450,11 +455,20 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header)
userid := fmt.Sprintf("%s@%s", uuid.NewV4().String(), users.realm) userid := fmt.Sprintf("%s@%s", uuid.NewV4().String(), users.realm)
// Make sure Sid matches session and is valid. // Make sure Sid matches session and is valid.
if !users.hub.ValidateSession(snr.Id, snr.Sid) { if !users.ValidateSession(snr.Id, snr.Sid) {
return 403, NewApiError("users_invalid_session", "Invalid session"), http.Header{"Content-Type": {"application/json"}} return 403, NewApiError("users_invalid_session", "Invalid session"), http.Header{"Content-Type": {"application/json"}}
} }
nonce, err := users.hub.sessiontokenHandler(&SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) var (
nonce string
err error
)
if session, ok := users.GetSession(snr.Id); ok {
nonce, err = session.Authorize(users.Realm(), &SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else {
err = errors.New("no such session")
}
if err != nil { if err != nil {
return 400, NewApiError("users_request_failed", fmt.Sprintf("Error: %q", err)), http.Header{"Content-Type": {"application/json"}} return 400, NewApiError("users_request_failed", fmt.Sprintf("Error: %q", err)), http.Header{"Content-Type": {"application/json"}}
} }

35
src/app/spreed-webrtc-server/ws.go

@ -22,9 +22,10 @@
package main package main
import ( import (
"github.com/gorilla/websocket"
"log" "log"
"net/http" "net/http"
"github.com/gorilla/websocket"
) )
const ( const (
@ -49,10 +50,8 @@ var (
} }
) )
func makeWsHubHandler(h *Hub) http.HandlerFunc { func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionManager, codec Codec, channellingAPI ChannellingAPI) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Validate incoming request. // Validate incoming request.
if r.Method != "GET" { if r.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
@ -68,30 +67,14 @@ func makeWsHubHandler(h *Hub) http.HandlerFunc {
return return
} }
// Read request details.
r.ParseForm()
token := r.FormValue("t")
// Create a new connection instance. // Create a new connection instance.
c := NewConnection(h, ws, r) session := sessionManager.CreateSession(r)
if token != "" { defer sessionManager.DestroySession(session)
if err := c.reregister(token); err != nil { client := NewClient(codec, channellingAPI, session)
log.Println(err) conn := NewConnection(connectionCounter.CountConnection(), ws, client)
w.WriteHeader(http.StatusInternalServerError)
return
}
} else {
if err := c.register(); err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
// Start pumps (readPump blocks). // Start pumps (readPump blocks).
go c.writePump() go conn.writePump()
c.readPump() conn.readPump()
} }
} }

Loading…
Cancel
Save