From 713a2b903bd3c1a453f12b359b3dea49071d76c8 Mon Sep 17 00:00:00 2001 From: Lance Cooper Date: Mon, 13 Oct 2014 16:38:09 +0200 Subject: [PATCH] Major refactoring of server side code to allow isolated unit tests. In addition to adding unit tests for the "Hello" message, the following notable improvements are included: * Separate websocket callbacks from the hub via a handler API and adaptor. * Move all application specific state to the session. * Session no longer refers to the hub. * Remove redundant MessageRequest struct. * Hub is no longer responsible for buffer pool or buddy image management. * Consolidated connection table locking in the hub. * Remove redundant session table from the hub. * Split room join and leave into separate handlers. This also removes the RoomConnectionUpdate struct. * Entirely remove room management from the hub. This also provides room operations with a separate mutex. * Split stats into a separate service. * Simplify the session token handler. * Buddy image HTTP handler no longer takes the entire hub. * Centralize JSON encoding and decoding. This removes JSON encoding from the room worker queue. * Improve unicast message statistics. * Numerous other renamings and cleanup items. --- src/app/spreed-webrtc-server/buffercache.go | 18 + .../spreed-webrtc-server/channelling_api.go | 268 +++++++++ .../channelling_api_test.go | 134 +++++ src/app/spreed-webrtc-server/client.go | 87 +++ src/app/spreed-webrtc-server/connection.go | 164 ++--- src/app/spreed-webrtc-server/hub.go | 566 ++++-------------- .../spreed-webrtc-server/incoming_codec.go | 69 +++ src/app/spreed-webrtc-server/main.go | 29 +- src/app/spreed-webrtc-server/room_manager.go | 170 ++++++ src/app/spreed-webrtc-server/roomworker.go | 136 +++-- src/app/spreed-webrtc-server/server.go | 280 --------- src/app/spreed-webrtc-server/session.go | 28 +- .../spreed-webrtc-server/session_manager.go | 173 ++++++ src/app/spreed-webrtc-server/sessions.go | 13 +- src/app/spreed-webrtc-server/stats.go | 8 +- src/app/spreed-webrtc-server/stats_manager.go | 104 ++++ src/app/spreed-webrtc-server/tickets.go | 130 ++++ src/app/spreed-webrtc-server/users.go | 32 +- src/app/spreed-webrtc-server/ws.go | 35 +- 19 files changed, 1465 insertions(+), 979 deletions(-) create mode 100644 src/app/spreed-webrtc-server/channelling_api.go create mode 100644 src/app/spreed-webrtc-server/channelling_api_test.go create mode 100644 src/app/spreed-webrtc-server/client.go create mode 100644 src/app/spreed-webrtc-server/incoming_codec.go create mode 100644 src/app/spreed-webrtc-server/room_manager.go delete mode 100644 src/app/spreed-webrtc-server/server.go create mode 100644 src/app/spreed-webrtc-server/session_manager.go create mode 100644 src/app/spreed-webrtc-server/stats_manager.go create mode 100644 src/app/spreed-webrtc-server/tickets.go diff --git a/src/app/spreed-webrtc-server/buffercache.go b/src/app/spreed-webrtc-server/buffercache.go index 75310fc9..44a797e9 100644 --- a/src/app/spreed-webrtc-server/buffercache.go +++ b/src/app/spreed-webrtc-server/buffercache.go @@ -160,3 +160,21 @@ func (cache *bufferCache) New() Buffer { func (cache *bufferCache) Wrap(data []byte) Buffer { return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)} } + +func readAll(dest Buffer, r io.Reader) error { + var err error + defer func() { + e := recover() + if e == nil { + return + } + if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { + err = panicErr + } else { + panic(e) + } + }() + + _, err = dest.ReadFrom(r) + return err +} diff --git a/src/app/spreed-webrtc-server/channelling_api.go b/src/app/spreed-webrtc-server/channelling_api.go new file mode 100644 index 00000000..f6a9f3fc --- /dev/null +++ b/src/app/spreed-webrtc-server/channelling_api.go @@ -0,0 +1,268 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "log" + "strings" + "time" +) + +const ( + maxConferenceSize = 100 +) + +type ChannellingAPI interface { + OnConnect(Client, *Session) + OnIncoming(ResponseSender, *Session, *DataIncoming) + OnDisconnect(*Session) +} + +type channellingAPI struct { + version string + *Config + RoomStatusManager + SessionEncoder + SessionManager + StatsCounter + ContactManager + TurnDataCreator + Unicaster + Broadcaster + buddyImages ImageCache +} + +func NewChannellingAPI(version string, config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, broadcaster Broadcaster, buddyImages ImageCache) ChannellingAPI { + return &channellingAPI{ + version, + config, + roomStatus, + sessionEncoder, + sessionManager, + statsCounter, + contactManager, + turnDataCreator, + unicaster, + broadcaster, + buddyImages, + } +} + +func (api *channellingAPI) OnConnect(client Client, session *Session) { + api.Unicaster.OnConnect(client, session) + api.SendSelf(client, session) +} + +func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *DataIncoming) { + switch msg.Type { + case "Self": + api.SendSelf(c, session) + case "Hello": + //log.Println("Hello", msg.Hello, c.Index()) + // TODO(longsleep): Filter room id and user agent. + api.UpdateSession(session, &SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) + if session.Hello && session.Roomid != msg.Hello.Id { + api.LeaveRoom(session) + api.Broadcast(session, session.DataSessionLeft("soft")) + } + + if api.CanJoinRoom(msg.Hello.Id) { + session.Hello = true + session.Roomid = msg.Hello.Id + api.JoinRoom(session, c) + api.Broadcast(session, session.DataSessionJoined()) + } else { + session.Hello = false + } + case "Offer": + // TODO(longsleep): Validate offer + api.Unicast(session, msg.Offer.To, msg.Offer) + case "Candidate": + // TODO(longsleep): Validate candidate + api.Unicast(session, msg.Candidate.To, msg.Candidate) + case "Answer": + // TODO(longsleep): Validate Answer + api.Unicast(session, msg.Answer.To, msg.Answer) + case "Users": + if session.Hello { + sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)} + c.Reply(msg.Iid, sessions) + } + case "Authentication": + st := msg.Authentication.Authentication + if st == nil { + return + } + + if err := api.Authenticate(session, st, ""); err == nil { + log.Println("Authentication success", session.Userid) + api.SendSelf(c, session) + api.BroadcastSessionStatus(session) + } else { + log.Println("Authentication failed", err, st.Userid, st.Nonce) + } + case "Bye": + api.Unicast(session, msg.Bye.To, msg.Bye) + case "Status": + //log.Println("Status", msg.Status) + api.UpdateSession(session, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) + api.BroadcastSessionStatus(session) + case "Chat": + // TODO(longsleep): Limit sent chat messages per incoming connection. + if !msg.Chat.Chat.NoEcho { + api.Unicast(session, session.Id, msg.Chat) + } + msg.Chat.Chat.Time = time.Now().Format(time.RFC3339) + if msg.Chat.To == "" { + // TODO(longsleep): Check if chat broadcast is allowed. + if session.Hello { + api.CountBroadcastChat() + api.Broadcast(session, msg.Chat) + } + } else { + if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil { + if err := api.contactrequestHandler(session, msg.Chat.To, msg.Chat.Chat.Status.ContactRequest); err != nil { + log.Println("Ignoring invalid contact request.", err) + return + } + msg.Chat.Chat.Status.ContactRequest.Userid = session.Userid() + } + if msg.Chat.Chat.Status == nil { + api.CountUnicastChat() + } + + api.Unicast(session, msg.Chat.To, msg.Chat) + if msg.Chat.Chat.Mid != "" { + // Send out delivery confirmation status chat message. + api.Unicast(session, session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) + } + } + case "Conference": + // Check conference maximum size. + if len(msg.Conference.Conference) > maxConferenceSize { + log.Println("Refusing to create conference above limit.", len(msg.Conference.Conference)) + } else { + // Send conference update to anyone. + for _, id := range msg.Conference.Conference { + if id != session.Id { + api.Unicast(session, id, msg.Conference) + } + } + } + case "Alive": + c.Reply(msg.Iid, msg.Alive) + case "Sessions": + var users []*DataSession + switch msg.Sessions.Sessions.Type { + case "contact": + if userID, err := api.getContactID(session, msg.Sessions.Sessions.Token); err == nil { + users = api.GetUserSessions(session, userID) + } else { + log.Printf(err.Error()) + } + case "session": + id, err := session.attestation.Decode(msg.Sessions.Sessions.Token) + if err != nil { + log.Printf("Failed to decode incoming attestation", err, msg.Sessions.Sessions.Token) + break + } + session, ok := api.GetSession(id) + if !ok { + log.Printf("Cannot retrieve session for id %s", id) + break + } + users = make([]*DataSession, 1, 1) + users[0] = session.Data() + default: + log.Printf("Unkown incoming sessions request type %s", msg.Sessions.Sessions.Type) + } + + // TODO(lcooper): We ought to reply with a *DataError here if failed. + if users != nil { + c.Reply(msg.Iid, &DataSessions{Type: "Sessions", Users: users, Sessions: msg.Sessions.Sessions}) + } + default: + log.Println("OnText unhandled message type", msg.Type) + } +} + +func (api *channellingAPI) OnDisconnect(session *Session) { + dsl := session.DataSessionLeft("hard") + if session.Hello { + api.LeaveRoom(session) + api.Broadcast(session, dsl) + } + + session.RunForAllSubscribers(func(session *Session) { + log.Println("Notifying subscriber that we are gone", session.Id, session.Id) + api.Unicast(session, session.Id, dsl) + }) + + api.Unicaster.OnDisconnect(session) + + api.buddyImages.Delete(session.Id) +} + +func (api *channellingAPI) SendSelf(c Responder, session *Session) { + token, err := api.EncodeSessionToken(session) + if err != nil { + log.Println("Error in OnRegister", err) + return + } + + log.Println("Created new session token", len(token), token) + self := &DataSelf{ + Type: "Self", + Id: session.Id, + Sid: session.Sid, + Userid: session.Userid(), + Suserid: api.EncodeSessionUserID(session), + Token: token, + Version: api.version, + Turn: api.CreateTurnData(session), + Stun: api.StunURIs, + } + c.Reply("", self) +} + +func (api *channellingAPI) UpdateSession(session *Session, s *SessionUpdate) uint64 { + if s.Status != nil { + status, ok := s.Status.(map[string]interface{}) + if ok && status["buddyPicture"] != nil { + pic := status["buddyPicture"].(string) + if strings.HasPrefix(pic, "data:") { + imageId := api.buddyImages.Update(session.Id, pic[5:]) + if imageId != "" { + status["buddyPicture"] = "img:" + imageId + } + } + } + } + + return session.Update(s) +} + +func (api *channellingAPI) BroadcastSessionStatus(session *Session) { + if session.Hello { + api.Broadcast(session, session.DataSessionStatus()) + } +} diff --git a/src/app/spreed-webrtc-server/channelling_api_test.go b/src/app/spreed-webrtc-server/channelling_api_test.go new file mode 100644 index 00000000..ceec4678 --- /dev/null +++ b/src/app/spreed-webrtc-server/channelling_api_test.go @@ -0,0 +1,134 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "testing" +) + +const ( + testAppVersion string = "0.0.0+unittests" +) + +type fakeClient struct { +} + +func (fake *fakeClient) Send(_ Buffer) { +} + +func (fake *fakeClient) Reply(_ string, _ interface{}) { +} + +type fakeRoomManager struct { + disallowJoin bool + joinedRoomID string + leftRoomID string + roomUsers []*DataSession + joinedID string + leftID string + broadcasts []interface{} +} + +func (fake *fakeRoomManager) CanJoinRoom(roomID string) bool { + return !fake.disallowJoin +} + +func (fake *fakeRoomManager) RoomUsers(session *Session) []*DataSession { + return fake.roomUsers +} + +func (fake *fakeRoomManager) JoinRoom(session *Session, _ Sender) { + fake.joinedID = session.Roomid +} + +func (fake *fakeRoomManager) LeaveRoom(session *Session) { + fake.leftID = session.Roomid +} + +func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) { + fake.broadcasts = append(fake.broadcasts, msg) +} + +func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { + client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{} + return NewChannellingAPI(testAppVersion, nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager +} + +func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { + roomID, ua := "foobar", "unit tests" + api, client, session, roomManager := NewTestChannellingAPI() + + api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomID, Ua: ua}}) + + if roomManager.joinedID != roomID { + t.Errorf("Expected to have joined room %v, but got %v", roomID, roomManager.joinedID) + } + + if broadcastCount := len(roomManager.broadcasts); broadcastCount != 1 { + t.Fatalf("Expected 1 broadcast, but got %d", broadcastCount) + } + + dataSession, ok := roomManager.broadcasts[0].(*DataSession) + if !ok { + t.Fatal("Expected a session data broadcast") + } + + if dataSession.Ua != ua { + t.Errorf("Expected to have broadcasted a user agent of %v, but was %v", ua, dataSession.Ua) + } +} + +func Test_ChannellingAPI_OnIncoming_HelloMessage_LeavesAnyPreviouslyJoinedRooms(t *testing.T) { + roomID := "foobar" + api, client, session, roomManager := NewTestChannellingAPI() + + api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomID}}) + api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: "baz"}}) + + if roomManager.leftID != roomID { + t.Errorf("Expected to have left room %v, but got %v", roomID, roomManager.leftID) + } + + if broadcastCount := len(roomManager.broadcasts); broadcastCount != 3 { + t.Fatalf("Expected 3 broadcasts, but got %d", broadcastCount) + } + + dataSession, ok := roomManager.broadcasts[1].(*DataSession) + if !ok { + t.Fatal("Expected a session data broadcast") + } + + if status := "soft"; dataSession.Status != status { + t.Errorf("Expected to have broadcast a leave status of of %v, but was %v", status, dataSession.Status) + } +} + +func Test_ChannellingAPI_OnIncoming_HelloMessage_DoesNotJoinIfNotPermitted(t *testing.T) { + api, client, session, roomManager := NewTestChannellingAPI() + roomManager.disallowJoin = true + + api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{}}) + + if broadcastCount := len(roomManager.broadcasts); broadcastCount != 0 { + t.Fatalf("Expected no broadcasts, but got %d", broadcastCount) + } +} diff --git a/src/app/spreed-webrtc-server/client.go b/src/app/spreed-webrtc-server/client.go new file mode 100644 index 00000000..46a57694 --- /dev/null +++ b/src/app/spreed-webrtc-server/client.go @@ -0,0 +1,87 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "log" +) + +type Sender interface { + Send(Buffer) +} + +type ResponseSender interface { + Sender + Responder +} + +type Responder interface { + Reply(iid string, m interface{}) +} + +type Client interface { + ResponseSender + Session() *Session + Index() uint64 + Close(bool) +} + +type client struct { + Codec + ChannellingAPI + Connection + session *Session +} + +func NewClient(codec Codec, api ChannellingAPI, session *Session) *client { + return &client{codec, api, nil, session} +} + +func (client *client) OnConnect(conn Connection) { + client.Connection = conn + client.ChannellingAPI.OnConnect(client, client.session) +} + +func (client *client) OnText(b Buffer) { + if incoming, err := client.DecodeIncoming(b); err == nil { + client.OnIncoming(client, client.session, incoming) + } else { + log.Println("OnText error while decoding JSON", err) + log.Printf("JSON:\n%s\n", b) + } +} + +func (client *client) OnDisconnect() { + client.ChannellingAPI.OnDisconnect(client.session) +} + +func (client *client) Reply(iid string, m interface{}) { + outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m} + if b, err := client.EncodeOutgoing(outgoing); err == nil { + client.Send(b) + b.Decref() + } +} + +func (client *client) Session() *Session { + return client.session +} diff --git a/src/app/spreed-webrtc-server/connection.go b/src/app/spreed-webrtc-server/connection.go index 137d41ec..e1d343fc 100644 --- a/src/app/spreed-webrtc-server/connection.go +++ b/src/app/spreed-webrtc-server/connection.go @@ -22,14 +22,13 @@ package main import ( - "bytes" "container/list" - "github.com/gorilla/websocket" "io" "log" - "net/http" "sync" "time" + + "github.com/gorilla/websocket" ) const ( @@ -54,110 +53,77 @@ const ( maxRatePerSecond = 20 ) -type Connection struct { +type Connection interface { + Index() uint64 + Send(Buffer) + Close(runCallbacks bool) + readPump() + writePump() +} + +type ConnectionHandler interface { + NewBuffer() Buffer + OnConnect(Connection) + OnText(Buffer) + OnDisconnect() +} + +type connection struct { // References. - h *Hub ws *websocket.Conn - request *http.Request + handler ConnectionHandler // Data handling. condition *sync.Cond queue list.List mutex sync.Mutex isClosed bool - isClosing bool - - // Metadata. - Id string - Roomid string // Keep Roomid here for quick acess without locking c.Session. - Idx uint64 - Session *Session - IsRegistered bool - Hello bool - Version string -} -func NewConnection(h *Hub, ws *websocket.Conn, request *http.Request) *Connection { + // Debugging + Idx uint64 +} - c := &Connection{ - h: h, +func NewConnection(index uint64, ws *websocket.Conn, handler ConnectionHandler) Connection { + c := &connection{ ws: ws, - request: request, + handler: handler, + Idx: index, } c.condition = sync.NewCond(&c.mutex) return c - } -func (c *Connection) close() { +func (c *connection) Index() uint64 { + return c.Idx +} - if !c.isClosed { - c.ws.Close() - c.Session.Close() - c.mutex.Lock() - c.Session = nil - c.isClosed = true - for { - head := c.queue.Front() - if head == nil { - break - } - c.queue.Remove(head) - message := head.Value.(Buffer) - message.Decref() - } - c.condition.Signal() +func (c *connection) Close(runCallbacks bool) { + c.mutex.Lock() + if c.isClosed { c.mutex.Unlock() + return } - -} - -func (c *Connection) register() error { - - s := c.h.CreateSession(c.request, nil) - c.h.registerHandler(c, s) - return nil -} - -func (c *Connection) reregister(token string) error { - - if st, err := c.h.DecodeSessionToken(token); err == nil { - s := c.h.CreateSession(c.request, st) - c.h.registerHandler(c, s) - } else { - log.Println("Error while decoding session token", err) - c.register() + if runCallbacks { + c.handler.OnDisconnect() } - return nil - -} - -func (c *Connection) unregister() { - c.isClosing = true - c.h.unregisterHandler(c) -} - -func (c *Connection) readAll(dest Buffer, r io.Reader) error { - var err error - defer func() { - e := recover() - if e == nil { - return - } - if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { - err = panicErr - } else { - panic(e) + c.ws.Close() + c.isClosed = true + for { + head := c.queue.Front() + if head == nil { + break } - }() - - _, err = dest.ReadFrom(r) - return err + c.queue.Remove(head) + message := head.Value.(Buffer) + message.Decref() + } + c.condition.Signal() + c.mutex.Unlock() } // readPump pumps messages from the websocket connection to the hub. -func (c *Connection) readPump() { +func (c *connection) readPump() { c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetPongHandler(func(string) error { @@ -165,6 +131,10 @@ func (c *Connection) readPump() { return nil }) times := list.New() + + // NOTE(lcooper): This more or less assumes that the write pump is started. + c.handler.OnConnect(c) + for { //fmt.Println("readPump wait nextReader", c.Idx) op, r, err := c.ws.NextReader() @@ -177,12 +147,6 @@ func (c *Connection) readPump() { } switch op { case websocket.TextMessage: - message := c.h.buffers.New() - err = c.readAll(message, r) - if err != nil { - message.Decref() - break - } now := time.Now() if times.Len() == maxRatePerSecond { front := times.Front() @@ -194,18 +158,23 @@ func (c *Connection) readPump() { } } times.PushBack(now) - c.h.server.OnText(c, message) + + message := c.handler.NewBuffer() + err = readAll(message, r) + if err != nil { + message.Decref() + break + } + c.handler.OnText(message) message.Decref() } } - c.unregister() - c.ws.Close() + c.Close(true) } // Write message to outbound queue. -func (c *Connection) send(message Buffer) { - +func (c *connection) Send(message Buffer) { c.mutex.Lock() defer c.mutex.Unlock() if c.isClosed { @@ -223,8 +192,7 @@ func (c *Connection) send(message Buffer) { } // writePump pumps messages from the queue to the websocket connection. -func (c *Connection) writePump() { - +func (c *connection) writePump() { var timer *time.Timer ping := false @@ -301,16 +269,16 @@ func (c *Connection) writePump() { cleanup: //fmt.Println("writePump done") timer.Stop() - c.ws.Close() + c.Close(true) } // Write ping message. -func (c *Connection) ping() error { +func (c *connection) ping() error { return c.write(websocket.PingMessage, []byte{}) } // Write writes a message with the given opCode and payload. -func (c *Connection) write(opCode int, payload []byte) error { +func (c *connection) write(opCode int, payload []byte) error { c.ws.SetWriteDeadline(time.Now().Add(writeWait)) return c.ws.WriteMessage(opCode, payload) } diff --git a/src/app/spreed-webrtc-server/hub.go b/src/app/spreed-webrtc-server/hub.go index f8d75546..9c1d1742 100644 --- a/src/app/spreed-webrtc-server/hub.go +++ b/src/app/spreed-webrtc-server/hub.go @@ -22,21 +22,16 @@ package main import ( - "bytes" "crypto/aes" "crypto/hmac" "crypto/sha1" "crypto/sha256" "encoding/base64" - "encoding/json" "errors" "fmt" "github.com/gorilla/securecookie" "log" - "net/http" - "strings" "sync" - "sync/atomic" "time" ) @@ -46,85 +41,56 @@ const ( maxUsersLength = 5000 ) -// TODO(longsleep): Get rid of MessageRequest type. -type MessageRequest struct { - From string - To string - Message Buffer - Id string +type SessionStore interface { + GetSession(id string) (session *Session, ok bool) } -type HubStat struct { - Rooms int `json:"rooms"` - Connections int `json:"connections"` - Sessions int `json:"sessions"` - Users int `json:"users"` - Count uint64 `json:"count"` - BroadcastChatMessages uint64 `json:"broadcastchatmessages"` - UnicastChatMessages uint64 `json:"unicastchatmessages"` - IdsInRoom map[string][]string `json:"idsinroom,omitempty"` - SessionsById map[string]*DataSession `json:"sessionsbyid,omitempty"` - UsersById map[string]*DataUser `json:"usersbyid,omitempty"` - ConnectionsByIdx map[string]string `json:"connectionsbyidx,omitempty"` +type Unicaster interface { + SessionStore + OnConnect(Client, *Session) + Unicast(session *Session, to string, m interface{}) + OnDisconnect(*Session) } -type Hub struct { - server *Server - connectionTable map[string]*Connection - sessionTable map[string]*Session - roomTable map[string]*RoomWorker - userTable map[string]*User - fakesessionTable map[string]*Session - version string - config *Config - sessionSecret []byte - encryptionSecret []byte - turnSecret []byte - tickets *securecookie.SecureCookie - attestations *securecookie.SecureCookie - count uint64 - mutex sync.RWMutex - buffers BufferCache - broadcastChatMessages uint64 - unicastChatMessages uint64 - buddyImages ImageCache - realm string - tokenName string - useridRetriever func(*http.Request) (string, error) - contacts *securecookie.SecureCookie +type ContactManager interface { + contactrequestHandler(*Session, string, *DataContactRequest) error + getContactID(*Session, string) (string, error) } -func NewHub(version string, config *Config, sessionSecret, encryptionSecret, turnSecret []byte, realm string) *Hub { - - h := &Hub{ - connectionTable: make(map[string]*Connection), - sessionTable: make(map[string]*Session), - roomTable: make(map[string]*RoomWorker), - userTable: make(map[string]*User), - fakesessionTable: make(map[string]*Session), - version: version, - config: config, - sessionSecret: sessionSecret, - encryptionSecret: encryptionSecret, - turnSecret: turnSecret, - realm: realm, - } +type TurnDataCreator interface { + CreateTurnData(*Session) *DataTurn +} + +type ClientStats interface { + ClientInfo(details bool) (int, map[string]*DataSession, map[string]string) +} + +type Hub interface { + ClientStats + Unicaster + TurnDataCreator + ContactManager +} - if len(h.sessionSecret) < 32 { - log.Printf("Weak sessionSecret (only %d bytes). It is recommended to use a key with 32 or 64 bytes.\n", len(h.sessionSecret)) +type hub struct { + OutgoingEncoder + clients map[string]Client + config *Config + turnSecret []byte + mutex sync.RWMutex + contacts *securecookie.SecureCookie +} + +func NewHub(config *Config, sessionSecret, encryptionSecret, turnSecret []byte, encoder OutgoingEncoder) Hub { + + h := &hub{ + OutgoingEncoder: encoder, + clients: make(map[string]Client), + config: config, + turnSecret: turnSecret, } - h.tickets = securecookie.New(h.sessionSecret, h.encryptionSecret) - h.tickets.MaxAge(86400 * 30) // 30 days - h.tickets.HashFunc(sha256.New) - h.tickets.BlockFunc(aes.NewCipher) - h.attestations = securecookie.New(h.sessionSecret, nil) - h.attestations.MaxAge(300) // 5 minutes - h.tickets.HashFunc(sha256.New) - h.buffers = NewBufferCache(1024, bytes.MinRead) - h.buddyImages = NewImageCache() - h.tokenName = fmt.Sprintf("token@%s", h.realm) - h.contacts = securecookie.New(h.sessionSecret, h.encryptionSecret) + h.contacts = securecookie.New(sessionSecret, encryptionSecret) h.contacts.MaxAge(0) // Forever h.contacts.HashFunc(sha256.New) h.contacts.BlockFunc(aes.NewCipher) @@ -132,48 +98,27 @@ func NewHub(version string, config *Config, sessionSecret, encryptionSecret, tur } -func (h *Hub) Stat(details bool) *HubStat { +func (h *hub) ClientInfo(details bool) (clientCount int, sessions map[string]*DataSession, connections map[string]string) { h.mutex.RLock() defer h.mutex.RUnlock() - stat := &HubStat{ - Rooms: len(h.roomTable), - Connections: len(h.connectionTable), - Sessions: len(h.sessionTable), - Users: len(h.userTable), - Count: h.count, - BroadcastChatMessages: atomic.LoadUint64(&h.broadcastChatMessages), - UnicastChatMessages: atomic.LoadUint64(&h.unicastChatMessages), - } + + clientCount = len(h.clients) if details { - rooms := make(map[string][]string) - for roomid, room := range h.roomTable { - sessions := make([]string, 0, len(room.connections)) - for id := range room.connections { - sessions = append(sessions, id) - } - rooms[roomid] = sessions - } - stat.IdsInRoom = rooms - sessions := make(map[string]*DataSession) - for sessionid, session := range h.sessionTable { - sessions[sessionid] = session.Data() - } - stat.SessionsById = sessions - users := make(map[string]*DataUser) - for userid, user := range h.userTable { - users[userid] = user.Data() + sessions = make(map[string]*DataSession) + for id, client := range h.clients { + sessions[id] = client.Session().Data() } - stat.UsersById = users - connections := make(map[string]string) - for id, connection := range h.connectionTable { - connections[fmt.Sprintf("%d", connection.Idx)] = id + + connections = make(map[string]string) + for id, client := range h.clients { + connections[fmt.Sprintf("%d", client.Index())] = id } - stat.ConnectionsByIdx = connections } - return stat + + return } -func (h *Hub) CreateTurnData(id string) *DataTurn { +func (h *hub) CreateTurnData(session *Session) *DataTurn { // Create turn data credentials for shared secret auth with TURN // server. See http://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 @@ -182,6 +127,7 @@ func (h *Hub) CreateTurnData(id string) *DataTurn { if len(h.turnSecret) == 0 { return &DataTurn{} } + id := session.Id bar := sha256.New() bar.Write([]byte(id)) id = base64.StdEncoding.EncodeToString(bar.Sum(nil)) @@ -194,389 +140,85 @@ func (h *Hub) CreateTurnData(id string) *DataTurn { } -func (h *Hub) CreateSuserid(session *Session) (suserid string) { - userid := session.Userid() - if userid != "" { - m := hmac.New(sha256.New, h.encryptionSecret) - m.Write([]byte(userid)) - suserid = base64.StdEncoding.EncodeToString(m.Sum(nil)) +func (h *hub) GetSession(id string) (session *Session, ok bool) { + var client Client + client, ok = h.GetClient(id) + if ok { + session = client.Session() } return } -func (h *Hub) CreateSession(request *http.Request, st *SessionToken) *Session { - - var session *Session - var userid string - usersEnabled := h.config.UsersEnabled - - if usersEnabled && h.useridRetriever != nil { - userid, _ = h.useridRetriever(request) - } - - if st == nil { - sid := NewRandomString(32) - id, _ := h.tickets.Encode("id", sid) - session = NewSession(h, id, sid) - log.Println("Created new session id", id) - } else { - if userid == "" { - userid = st.Userid - } - if !usersEnabled { - userid = "" - } - session = NewSession(h, st.Id, st.Sid) - } - - if userid != "" { - h.authenticateHandler(session, st, userid) - } - - return session - -} - -func (h *Hub) CreateFakeSession(userid string) *Session { - - h.mutex.Lock() - session, ok := h.fakesessionTable[userid] - if !ok { - sid := fmt.Sprintf("fake-%s", NewRandomString(27)) - id, _ := h.tickets.Encode("id", sid) - log.Println("Created new fake session id", id) - session = NewSession(h, id, sid) - session.SetUseridFake(userid) - h.fakesessionTable[userid] = session - } - h.mutex.Unlock() - return session - -} - -func (h *Hub) ValidateSession(id, sid string) bool { - - var decoded string - err := h.tickets.Decode("id", id, &decoded) - if err != nil { - log.Println("Session validation error", err, id, sid) - return false - } - if decoded != sid { - log.Println("Session validation failed", id, sid) - return false - } - return true - -} - -func (h *Hub) EncodeSessionToken(st *SessionToken) (string, error) { - - return h.tickets.Encode(h.tokenName, st) - -} - -func (h *Hub) DecodeSessionToken(token string) (*SessionToken, error) { - - st := &SessionToken{} - err := h.tickets.Decode(h.tokenName, token, st) - return st, err - -} - -func (h *Hub) GetRoom(id string) *RoomWorker { - - h.mutex.RLock() - room, ok := h.roomTable[id] - if !ok { - h.mutex.RUnlock() - h.mutex.Lock() - // Need to re-check, another thread might have created the room - // while we waited for the lock. - room, ok = h.roomTable[id] - if !ok { - room = NewRoomWorker(h, id) - h.roomTable[id] = room - h.mutex.Unlock() - go func() { - // Start room, this blocks until room expired. - room.Start() - // Cleanup room when we are done. - h.mutex.Lock() - defer h.mutex.Unlock() - delete(h.roomTable, id) - log.Printf("Cleaned up room '%s'\n", id) - }() - } else { - h.mutex.Unlock() - } - } else { - h.mutex.RUnlock() - } - - return room - -} - -func (h *Hub) GetGlobalConnections() []*Connection { - - if h.config.globalRoomid == "" { - return make([]*Connection, 0) - } - h.mutex.RLock() - if room, ok := h.roomTable[h.config.globalRoomid]; ok { - h.mutex.RUnlock() - return room.GetConnections() - } - - h.mutex.RUnlock() - return make([]*Connection, 0) - -} - -func (h *Hub) RunForAllRooms(f func(room *RoomWorker)) { - - h.mutex.RLock() - for _, room := range h.roomTable { - f(room) - } - h.mutex.RUnlock() - -} - -func (h *Hub) isGlobalRoomid(id string) bool { - - return id != "" && (id == h.config.globalRoomid) - -} - -func (h *Hub) isDefaultRoomid(id string) bool { - - return id == "" -} - -func (h *Hub) registerHandler(c *Connection, s *Session) { - - // Apply session to connection. - c.Id = s.Id - c.Session = s +func (h *hub) OnConnect(client Client, session *Session) { + // Set flags. h.mutex.Lock() - // Set flags. - h.count++ - c.Idx = h.count - c.IsRegistered = true + log.Printf("Created client with id %s", session.Id) // Register connection or replace existing one. - if ec, ok := h.connectionTable[c.Id]; ok { - ec.IsRegistered = false - ec.close() + if ec, ok := h.clients[session.Id]; ok { + ec.Close(false) //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) } - - h.connectionTable[c.Id] = c - h.sessionTable[c.Id] = s + h.clients[session.Id] = client //fmt.Println("registered", c.Id) h.mutex.Unlock() //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) - h.server.OnRegister(c) - } -func (h *Hub) unregisterHandler(c *Connection) { - +func (h *hub) OnDisconnect(session *Session) { h.mutex.Lock() - if !c.IsRegistered { - h.mutex.Unlock() - return - } - suserid := c.Session.Userid() - delete(h.connectionTable, c.Id) - delete(h.sessionTable, c.Id) - if suserid != "" { - user, ok := h.userTable[suserid] - if ok { - empty := user.RemoveSession(c.Session) - if empty { - delete(h.userTable, suserid) - } - } - } + delete(h.clients, session.Id) h.mutex.Unlock() - h.buddyImages.Delete(c.Id) - //log.Printf("Unregister (%d) from %s: %s\n", c.Idx, c.RemoteAddr, c.Id) - h.server.OnUnregister(c) - c.close() - } -func (h *Hub) unicastHandler(m *MessageRequest) { - +func (h *hub) GetClient(id string) (client Client, ok bool) { h.mutex.RLock() - out, ok := h.connectionTable[m.To] + client, ok = h.clients[id] h.mutex.RUnlock() - if !ok { - log.Println("Unicast To not found", m.To) - return - } - out.send(m.Message) - + return } -func (h *Hub) aliveHandler(c *Connection, alive *DataAlive, iid string) { - - aliveJson := h.buffers.New() - encoder := json.NewEncoder(aliveJson) - err := encoder.Encode(&DataOutgoing{From: c.Id, Data: alive, Iid: iid}) - if err != nil { - log.Println("Alive error while encoding JSON", err) - aliveJson.Decref() - return +func (h *hub) Unicast(session *Session, to string, m interface{}) { + outgoing := &DataOutgoing{ + From: session.Id, + To: to, + A: session.Attestation(), + Data: m, } - c.send(aliveJson) - aliveJson.Decref() - -} - -func (h *Hub) sessionsHandler(c *Connection, srq *DataSessionsRequest, iid string) { - - var users []*DataSession - - switch srq.Type { - case "contact": - contact := &Contact{} - err := h.contacts.Decode("contact", srq.Token, contact) - if err != nil { - log.Println("Failed to decode incoming contact token", err, srq.Token) - return - } - // Use the userid which is not ours from the contact data. - var userid string - suserid := c.Session.Userid() - if contact.A == suserid { - userid = contact.B - } else if contact.B == suserid { - userid = contact.A - } - if userid == "" { - log.Println("Ignoring foreign contact token", contact.A, contact.B) - return - } - // Find foreign user. - h.mutex.RLock() - user, ok := h.userTable[userid] - h.mutex.RUnlock() + if message, err := h.EncodeOutgoing(outgoing); err == nil { + client, ok := h.GetClient(to) if !ok { - // No user. Create fake session. - users = make([]*DataSession, 1, 1) - users[0] = h.CreateFakeSession(userid).Data() - } else { - // Add sessions for forein user. - users = user.SubscribeSessions(c.Session) - } - case "session": - id, err := c.Session.attestation.Decode(srq.Token) - if err != nil { - log.Println("Failed to decode incoming attestation", err, srq.Token) + log.Println("Unicast To not found", to) return } - h.mutex.RLock() - session, ok := h.sessionTable[id] - h.mutex.RUnlock() - if !ok { - return - } - users = make([]*DataSession, 1, 1) - users[0] = session.Data() - default: - log.Println("Unkown incoming sessions request type", srq.Type) + client.Send(message) + message.Decref() } - - if users != nil { - sessions := &DataSessions{Type: "Sessions", Users: users, Sessions: srq} - sessionsJson := h.buffers.New() - encoder := json.NewEncoder(sessionsJson) - err := encoder.Encode(&DataOutgoing{From: c.Id, Data: sessions, Iid: iid}) - if err != nil { - log.Println("Sessions error while encoding JSON", err) - sessionsJson.Decref() - return - } - c.send(sessionsJson) - sessionsJson.Decref() - } - } -func (h *Hub) sessionupdateHandler(s *SessionUpdate) uint64 { - - //fmt.Println("Userupdate", u) - h.mutex.RLock() - session, ok := h.sessionTable[s.Id] - h.mutex.RUnlock() - var rev uint64 - if ok { - if s.Status != nil { - status, ok := s.Status.(map[string]interface{}) - if ok && status["buddyPicture"] != nil { - pic := status["buddyPicture"].(string) - if strings.HasPrefix(pic, "data:") { - imageId := h.buddyImages.Update(s.Id, pic[5:]) - if imageId != "" { - status["buddyPicture"] = "img:" + imageId - } - } - } - } - rev = session.Update(s) - } else { - log.Printf("Update data for unknown user %s\n", s.Id) - } - return rev - -} - -func (h *Hub) sessiontokenHandler(st *SessionToken) (string, error) { - - h.mutex.RLock() - c, ok := h.connectionTable[st.Id] - h.mutex.RUnlock() - - if !ok { - return "", errors.New("no such connection") - } - - nonce, err := c.Session.Authorize(h.realm, st) +func (h *hub) getContactID(session *Session, token string) (userid string, err error) { + contact := &Contact{} + err = h.contacts.Decode("contact", token, contact) if err != nil { - return "", err + err = fmt.Errorf("Failed to decode incoming contact token", err, token) + return } - - return nonce, nil - -} - -func (h *Hub) authenticateHandler(session *Session, st *SessionToken, userid string) error { - - err := session.Authenticate(h.realm, st, userid) - if err == nil { - // Authentication success. - suserid := session.Userid() - h.mutex.Lock() - user, ok := h.userTable[suserid] - if !ok { - user = NewUser(suserid) - h.userTable[suserid] = user - } - h.mutex.Unlock() - user.AddSession(session) + // Use the userid which is not ours from the contact data. + suserid := session.Userid() + if contact.A == suserid { + userid = contact.B + } else if contact.B == suserid { + userid = contact.A } - - return err - + if userid == "" { + err = fmt.Errorf("Ignoring foreign contact token", contact.A, contact.B) + } + return } -func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactRequest) error { +func (h *hub) contactrequestHandler(session *Session, to string, cr *DataContactRequest) error { var err error @@ -588,13 +230,11 @@ func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactReq if err != nil { return err } - suserid := c.Session.Userid() + suserid := session.Userid() if suserid == "" { return errors.New("no userid") } - h.mutex.RLock() - session, ok := h.sessionTable[to] - h.mutex.RUnlock() + session, ok := h.GetSession(to) if !ok { return errors.New("unknown to session for confirm") } @@ -616,13 +256,11 @@ func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactReq } else { // New request. // Create Token with flag and c.Session.Userid and the to Session.Userid. - suserid := c.Session.Userid() + suserid := session.Userid() if suserid == "" { return errors.New("no userid") } - h.mutex.RLock() - session, ok := h.sessionTable[to] - h.mutex.RUnlock() + session, ok := h.GetSession(to) if !ok { return errors.New("unknown to session") } diff --git a/src/app/spreed-webrtc-server/incoming_codec.go b/src/app/spreed-webrtc-server/incoming_codec.go new file mode 100644 index 00000000..dba9c2d5 --- /dev/null +++ b/src/app/spreed-webrtc-server/incoming_codec.go @@ -0,0 +1,69 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "bytes" + "encoding/json" + "log" +) + +type IncomingDecoder interface { + DecodeIncoming(Buffer) (*DataIncoming, error) +} + +type OutgoingEncoder interface { + EncodeOutgoing(*DataOutgoing) (Buffer, error) +} + +type Codec interface { + NewBuffer() Buffer + IncomingDecoder + OutgoingEncoder +} + +type incomingCodec struct { + buffers BufferCache +} + +func NewCodec() Codec { + return &incomingCodec{NewBufferCache(1024, bytes.MinRead)} +} + +func (codec incomingCodec) NewBuffer() Buffer { + return codec.buffers.New() +} + +func (codec incomingCodec) DecodeIncoming(b Buffer) (*DataIncoming, error) { + incoming := &DataIncoming{} + return incoming, json.Unmarshal(b.Bytes(), incoming) +} + +func (codec incomingCodec) EncodeOutgoing(outgoing *DataOutgoing) (Buffer, error) { + b := codec.NewBuffer() + if err := json.NewEncoder(b).Encode(outgoing); err != nil { + log.Println("Error while encoding JSON", err) + b.Decref() + return nil, err + } + return b, nil +} diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index 50db95c8..5bcbffb2 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -91,12 +91,12 @@ func roomHandler(w http.ResponseWriter, r *http.Request) { } -func makeImageHandler(hub *Hub, expires time.Duration) http.HandlerFunc { +func makeImageHandler(buddyImages ImageCache, expires time.Duration) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - image := hub.buddyImages.Get(vars["imageid"]) + image := buddyImages.Get(vars["imageid"]) if image == nil { http.Error(w, "Unknown image", http.StatusNotFound) return @@ -223,6 +223,10 @@ func runner(runtime phoenix.Runtime) error { } } + if len(sessionSecret) < 32 { + log.Printf("Weak sessionSecret (only %d bytes). It is recommended to use a key with 32 or 64 bytes.\n", len(sessionSecret)) + } + var encryptionSecret []byte encryptionSecretString, err := runtime.GetString("app", "encryptionSecret") if err != nil { @@ -371,9 +375,6 @@ func runner(runtime phoenix.Runtime) error { // Create realm string from config. computedRealm := fmt.Sprintf("%s.%s", serverRealm, serverToken) - // Create our hub instance. - hub := NewHub(runtimeVersion, config, sessionSecret, encryptionSecret, turnSecret, computedRealm) - // Set number of go routines if it is 1 if goruntime.GOMAXPROCS(0) == 1 { nCPU := goruntime.NumCPU() @@ -426,12 +427,20 @@ func runner(runtime phoenix.Runtime) error { } // Add handlers. + buddyImages := NewImageCache() + codec := NewCodec() + roomManager := NewRoomManager(config, codec) + hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec) + tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) + sessionManager := NewSessionManager(config, tickets, sessionSecret) + statsManager := NewStatsManager(hub, roomManager, sessionManager) + channellingAPI := NewChannellingAPI(runtimeVersion, config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, roomManager, buddyImages) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) - r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(basePath, makeImageHandler(hub, time.Duration(24)*time.Hour))) + r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(basePath, makeImageHandler(buddyImages, time.Duration(24)*time.Hour))) r.Handle("/static/{path:.*}", http.StripPrefix(basePath, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/robots.txt", http.StripPrefix(basePath, http.FileServer(http.Dir(path.Join(rootFolder, "static"))))) r.Handle("/favicon.ico", http.StripPrefix(basePath, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img"))))) - r.Handle("/ws", makeWsHubHandler(hub)) + r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI)) r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler)) // Add API end points. @@ -442,14 +451,14 @@ func runner(runtime phoenix.Runtime) error { api.AddResourceWithWrapper(&Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens") if usersEnabled { // Create Users handler. - users := NewUsers(hub, usersMode, serverRealm, runtime) - api.AddResource(&Sessions{hub: hub, users: users}, "/sessions/{id}/") + users := NewUsers(hub, tickets, sessionManager, usersMode, serverRealm, runtime) + api.AddResource(&Sessions{tickets, hub, users}, "/sessions/{id}/") if usersAllowRegistration { api.AddResource(users, "/users") } } if statsEnabled { - api.AddResourceWithWrapper(&Stats{hub: hub}, httputils.MakeGzipHandler, "/stats") + api.AddResourceWithWrapper(&Stats{statsManager}, httputils.MakeGzipHandler, "/stats") log.Println("Stats are enabled!") } diff --git a/src/app/spreed-webrtc-server/room_manager.go b/src/app/spreed-webrtc-server/room_manager.go new file mode 100644 index 00000000..874c96c9 --- /dev/null +++ b/src/app/spreed-webrtc-server/room_manager.go @@ -0,0 +1,170 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "log" + "sync" +) + +type RoomStatusManager interface { + CanJoinRoom(id string) bool + RoomUsers(*Session) []*DataSession + JoinRoom(*Session, Sender) + LeaveRoom(*Session) +} + +type Broadcaster interface { + Broadcast(*Session, interface{}) +} + +type RoomStats interface { + RoomInfo(includeSessions bool) (count int, sessionInfo map[string][]string) +} + +type RoomManager interface { + RoomStatusManager + Broadcaster + RoomStats +} + +type roomManager struct { + sync.RWMutex + OutgoingEncoder + defaultRoomEnabled bool + globalRoomID string + roomTable map[string]RoomWorker +} + +func NewRoomManager(config *Config, encoder OutgoingEncoder) RoomManager { + return &roomManager{ + sync.RWMutex{}, + encoder, + config.DefaultRoomEnabled, + config.globalRoomid, + make(map[string]RoomWorker), + } +} + +func (rooms *roomManager) CanJoinRoom(id string) bool { + return id != "" || rooms.defaultRoomEnabled +} + +func (rooms *roomManager) RoomUsers(session *Session) []*DataSession { + return <-rooms.getRoomWorker(session.Roomid).GetUsers() +} + +func (rooms *roomManager) JoinRoom(session *Session, sender Sender) { + rooms.getRoomWorker(session.Roomid).Join(session, sender) +} + +func (rooms *roomManager) LeaveRoom(session *Session) { + rooms.getRoomWorker(session.Roomid).Leave(session) +} + +func (rooms *roomManager) Broadcast(session *Session, m interface{}) { + outgoing := &DataOutgoing{ + From: session.Id, + A: session.Attestation(), + Data: m, + } + + message, err := rooms.EncodeOutgoing(outgoing) + if err != nil { + return + } + + id := session.Roomid + if id != "" && id == rooms.globalRoomID { + rooms.RLock() + for _, room := range rooms.roomTable { + room.Broadcast(session, message) + } + rooms.RUnlock() + } else { + room := rooms.getRoomWorker(id) + room.Broadcast(session, message) + } + message.Decref() +} + +func (rooms *roomManager) RoomInfo(includeSessions bool) (count int, sessionInfo map[string][]string) { + rooms.RLock() + defer rooms.RUnlock() + + count = len(rooms.roomTable) + if includeSessions { + sessionInfo := make(map[string][]string) + for roomid, room := range rooms.roomTable { + sessionInfo[roomid] = room.SessionIDs() + } + } + return +} + +func (rooms *roomManager) getRoomWorker(id string) RoomWorker { + + rooms.RLock() + room, ok := rooms.roomTable[id] + if !ok { + rooms.RUnlock() + rooms.Lock() + // Need to re-check, another thread might have created the room + // while we waited for the lock. + room, ok = rooms.roomTable[id] + if !ok { + room = NewRoomWorker(rooms, id) + rooms.roomTable[id] = room + rooms.Unlock() + go func() { + // Start room, this blocks until room expired. + room.Start() + // Cleanup room when we are done. + rooms.Lock() + defer rooms.Unlock() + delete(rooms.roomTable, id) + log.Printf("Cleaned up room '%s'\n", id) + }() + } else { + rooms.Unlock() + } + } else { + rooms.RUnlock() + } + + return room + +} + +func (rooms *roomManager) GlobalUsers() []*roomUser { + if rooms.globalRoomID == "" { + return make([]*roomUser, 0) + } + rooms.RLock() + if room, ok := rooms.roomTable[rooms.globalRoomID]; ok { + rooms.RUnlock() + return room.Users() + } + + rooms.RUnlock() + return make([]*roomUser, 0) +} diff --git a/src/app/spreed-webrtc-server/roomworker.go b/src/app/spreed-webrtc-server/roomworker.go index b671def7..df7e1feb 100644 --- a/src/app/spreed-webrtc-server/roomworker.go +++ b/src/app/spreed-webrtc-server/roomworker.go @@ -22,7 +22,6 @@ package main import ( - "encoding/json" "log" "sync" "time" @@ -33,39 +32,47 @@ const ( roomExpiryDuration = 60 * time.Second ) -type RoomConnectionUpdate struct { - Id string - Sessionid string - Status bool - Connection *Connection +type RoomWorker interface { + Start() + SessionIDs() []string + Users() []*roomUser + GetUsers() <-chan []*DataSession + Broadcast(*Session, Buffer) + Join(*Session, Sender) + Leave(*Session) } -type RoomWorker struct { +type roomWorker struct { // References. - h *Hub + manager *roomManager // Data handling. - workers chan (func()) - expired chan (bool) - connections map[string]*Connection - timer *time.Timer - mutex sync.RWMutex + workers chan (func()) + expired chan (bool) + users map[string]*roomUser + timer *time.Timer + mutex sync.RWMutex // Metadata. Id string } -func NewRoomWorker(h *Hub, id string) *RoomWorker { +type roomUser struct { + *Session + Sender +} + +func NewRoomWorker(manager *roomManager, id string) RoomWorker { log.Printf("Creating worker for room '%s'\n", id) - r := &RoomWorker{ - h: h, - Id: id, + r := &roomWorker{ + manager: manager, + Id: id, + workers: make(chan func(), roomMaxWorkers), + expired: make(chan bool), + users: make(map[string]*roomUser), } - r.workers = make(chan func(), roomMaxWorkers) - r.expired = make(chan bool) - r.connections = make(map[string]*Connection) // Create expire timer. r.timer = time.AfterFunc(roomExpiryDuration, func() { @@ -76,7 +83,7 @@ func NewRoomWorker(h *Hub, id string) *RoomWorker { } -func (r *RoomWorker) Start() { +func (r *roomWorker) Start() { // Main blocking worker. L: @@ -90,7 +97,7 @@ L: //fmt.Println("Work room expired", r.Id) //fmt.Println("Work room expired", r.Id, len(r.connections)) r.mutex.RLock() - if len(r.connections) == 0 { + if len(r.users) == 0 { // Cleanup room when it is empty. r.mutex.RUnlock() log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id) @@ -107,19 +114,29 @@ L: } -func (r *RoomWorker) GetConnections() []*Connection { +func (r *roomWorker) SessionIDs() []string { + r.mutex.RLock() + defer r.mutex.RUnlock() + sessions := make([]string, 0, len(r.users)) + for id := range r.users { + sessions = append(sessions, id) + } + return sessions +} + +func (r *roomWorker) Users() []*roomUser { r.mutex.RLock() defer r.mutex.RUnlock() - connections := make([]*Connection, 0, len(r.connections)) - for _, connection := range r.connections { - connections = append(connections, connection) + users := make([]*roomUser, 0, len(r.users)) + for _, user := range r.users { + users = append(users, user) } - return connections + return users } -func (r *RoomWorker) Run(f func()) bool { +func (r *roomWorker) Run(f func()) bool { select { case r.workers <- f: @@ -131,13 +148,12 @@ func (r *RoomWorker) Run(f func()) bool { } -func (r *RoomWorker) usersHandler(c *Connection) { - +func (r *roomWorker) GetUsers() <-chan []*DataSession { + out := make(chan []*DataSession, 1) worker := func() { - sessions := &DataSessions{Type: "Users"} var sl []*DataSession - appender := func(ec *Connection) bool { - ecsession := ec.Session + appender := func(user *roomUser) bool { + ecsession := user.Session if ecsession != nil { session := ecsession.Data() session.Type = "Online" @@ -150,73 +166,65 @@ func (r *RoomWorker) usersHandler(c *Connection) { return true } r.mutex.RLock() - sl = make([]*DataSession, 0, len(r.connections)) + sl = make([]*DataSession, 0, len(r.users)) // Include connections in this room. - for _, ec := range r.connections { - if !appender(ec) { + for _, user := range r.users { + if !appender(user) { break } } r.mutex.RUnlock() // Include connections to global room. - for _, ec := range c.h.GetGlobalConnections() { + for _, ec := range r.manager.GlobalUsers() { if !appender(ec) { break } } - sessions.Users = sl - sessionsJson := c.h.buffers.New() - encoder := json.NewEncoder(sessionsJson) - err := encoder.Encode(&DataOutgoing{From: c.Id, Data: sessions}) - if err != nil { - log.Println("Users error while encoding JSON", err) - sessionsJson.Decref() - return - } - c.send(sessionsJson) - sessionsJson.Decref() + out <- sl } r.Run(worker) - + return out } -func (r *RoomWorker) broadcastHandler(m *MessageRequest) { +func (r *roomWorker) Broadcast(session *Session, message Buffer) { worker := func() { r.mutex.RLock() defer r.mutex.RUnlock() - for id, ec := range r.connections { - if id == m.From { + for id, user := range r.users { + if id == session.Id { // Skip broadcast to self. continue } //fmt.Printf("%s\n", m.Message) - ec.send(m.Message) + user.Send(message) } - m.Message.Decref() + message.Decref() } - m.Message.Incref() + message.Incref() r.Run(worker) } -func (r *RoomWorker) connectionHandler(rcu *RoomConnectionUpdate) { +func (r *roomWorker) Join(session *Session, sender Sender) { + worker := func() { + r.mutex.Lock() + defer r.mutex.Unlock() + r.users[session.Id] = &roomUser{session, sender} + } + r.Run(worker) +} +func (r *roomWorker) Leave(session *Session) { worker := func() { r.mutex.Lock() defer r.mutex.Unlock() - if rcu.Status { - r.connections[rcu.Sessionid] = rcu.Connection - } else { - if _, ok := r.connections[rcu.Sessionid]; ok { - delete(r.connections, rcu.Sessionid) - } + if _, ok := r.users[session.Id]; ok { + delete(r.users, session.Id) } } - r.Run(worker) - } diff --git a/src/app/spreed-webrtc-server/server.go b/src/app/spreed-webrtc-server/server.go deleted file mode 100644 index 9ea70ad2..00000000 --- a/src/app/spreed-webrtc-server/server.go +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Spreed WebRTC. - * Copyright (C) 2013-2014 struktur AG - * - * This file is part of Spreed WebRTC. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -package main - -import ( - "encoding/json" - "log" - "sync/atomic" - "time" -) - -const ( - maxConferenceSize = 100 -) - -type Server struct { -} - -func (s *Server) OnRegister(c *Connection) { - //log.Println("OnRegister", c.id) - if token, err := c.h.EncodeSessionToken(c.Session.Token()); err == nil { - log.Println("Created new session token", len(token), token) - // Send stuff back. - s.Unicast(c, c.Id, &DataSelf{ - Type: "Self", - Id: c.Id, - Sid: c.Session.Sid, - Userid: c.Session.Userid(), - Suserid: c.h.CreateSuserid(c.Session), - Token: token, - Version: c.h.version, - Turn: c.h.CreateTurnData(c.Id), - Stun: c.h.config.StunURIs, - }) - } else { - log.Println("Error in OnRegister", c.Idx, err) - } -} - -func (s *Server) OnUnregister(c *Connection) { - //log.Println("OnUnregister", c.id) - dsl := c.Session.DataSessionLeft("hard") - if c.Hello { - s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid}) - s.Broadcast(c, dsl) - } - c.Session.RunForAllSubscribers(func(session *Session) { - log.Println("Notifying subscriber that we are gone", c.Id, session.Id) - s.Unicast(c, session.Id, dsl) - }) -} - -func (s *Server) OnText(c *Connection, b Buffer) { - - //log.Printf("OnText from %d: %s\n", c.Id, b) - var msg DataIncoming - err := json.Unmarshal(b.Bytes(), &msg) - if err != nil { - log.Println("OnText error while decoding JSON", err) - log.Printf("JSON:\n%s\n", b) - return - } - - switch msg.Type { - case "Self": - s.OnRegister(c) - case "Hello": - //log.Println("Hello", msg.Hello, c.Idx) - // TODO(longsleep): Filter room id and user agent. - s.UpdateSession(c, &SessionUpdate{Types: []string{"Roomid", "Ua"}, Roomid: msg.Hello.Id, Ua: msg.Hello.Ua}) - if c.Hello && c.Roomid != msg.Hello.Id { - // Room changed. - s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid}) - s.Broadcast(c, c.Session.DataSessionLeft("soft")) - } - c.Roomid = msg.Hello.Id - if c.h.config.DefaultRoomEnabled || !c.h.isDefaultRoomid(c.Roomid) { - c.Hello = true - s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid, Status: true}) - s.Broadcast(c, c.Session.DataSessionJoined()) - } else { - c.Hello = false - } - case "Offer": - // TODO(longsleep): Validate offer - s.Unicast(c, msg.Offer.To, msg.Offer) - case "Candidate": - // TODO(longsleep): Validate candidate - s.Unicast(c, msg.Candidate.To, msg.Candidate) - case "Answer": - // TODO(longsleep): Validate Answer - s.Unicast(c, msg.Answer.To, msg.Answer) - case "Users": - if c.Hello { - s.Users(c) - } - case "Authentication": - if msg.Authentication.Authentication != nil && s.Authenticate(c, msg.Authentication.Authentication) { - s.OnRegister(c) - if c.Hello { - s.Broadcast(c, c.Session.DataSessionStatus()) - } - } - case "Bye": - s.Unicast(c, msg.Bye.To, msg.Bye) - case "Status": - //log.Println("Status", msg.Status) - s.UpdateSession(c, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) - if c.Hello { - s.Broadcast(c, c.Session.DataSessionStatus()) - } - case "Chat": - // TODO(longsleep): Limit sent chat messages per incoming connection. - if !msg.Chat.Chat.NoEcho { - s.Unicast(c, c.Id, msg.Chat) - } - msg.Chat.Chat.Time = time.Now().Format(time.RFC3339) - if msg.Chat.To == "" { - // TODO(longsleep): Check if chat broadcast is allowed. - if c.Hello { - atomic.AddUint64(&c.h.broadcastChatMessages, 1) - s.Broadcast(c, msg.Chat) - } - } else { - if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil { - err = s.ContactRequest(c, msg.Chat.To, msg.Chat.Chat.Status.ContactRequest) - if err != nil { - log.Println("Ignoring invalid contact request.", err) - return - } - msg.Chat.Chat.Status.ContactRequest.Userid = c.Session.Userid() - } - atomic.AddUint64(&c.h.unicastChatMessages, 1) - s.Unicast(c, msg.Chat.To, msg.Chat) - if msg.Chat.Chat.Mid != "" { - // Send out delivery confirmation status chat message. - s.Unicast(c, c.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) - } - } - case "Conference": - // Check conference maximum size. - if len(msg.Conference.Conference) > maxConferenceSize { - log.Println("Refusing to create conference above limit.", len(msg.Conference.Conference)) - } else { - // Send conference update to anyone. - for _, id := range msg.Conference.Conference { - if id != c.Id { - //log.Println("participant", id) - s.Unicast(c, id, msg.Conference) - } - } - } - case "Alive": - s.Alive(c, msg.Alive, msg.Iid) - case "Sessions": - s.Sessions(c, msg.Sessions.Sessions, msg.Iid) - default: - log.Println("OnText unhandled message type", msg.Type) - } - -} - -func (s *Server) Unicast(c *Connection, to string, m interface{}) { - - outgoing := &DataOutgoing{From: c.Id, To: to, Data: m} - if !c.isClosing && c.Id != to { - outgoing.A = c.Session.Attestation() - } - b := c.h.buffers.New() - encoder := json.NewEncoder(b) - err := encoder.Encode(outgoing) - if err != nil { - b.Decref() - log.Println("Unicast error while encoding JSON", err) - return - } - //log.Println("Unicast", b) - - var msg = &MessageRequest{From: c.Id, To: to, Message: b} - c.h.unicastHandler(msg) - b.Decref() -} - -func (s *Server) Broadcast(c *Connection, m interface{}) { - - b := c.h.buffers.New() - encoder := json.NewEncoder(b) - err := encoder.Encode(&DataOutgoing{From: c.Id, Data: m, A: c.Session.Attestation()}) - if err != nil { - b.Decref() - log.Println("Broadcast error while encoding JSON", err) - return - } - - if c.h.isGlobalRoomid(c.Roomid) { - c.h.RunForAllRooms(func(room *RoomWorker) { - var msg = &MessageRequest{From: c.Id, Message: b, Id: room.Id} - room.broadcastHandler(msg) - }) - } else { - var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid} - room := c.h.GetRoom(c.Roomid) - room.broadcastHandler(msg) - } - b.Decref() - -} - -func (s *Server) Alive(c *Connection, alive *DataAlive, iid string) { - - c.h.aliveHandler(c, alive, iid) - -} - -func (s *Server) Sessions(c *Connection, srq *DataSessionsRequest, iid string) { - - c.h.sessionsHandler(c, srq, iid) - -} - -func (s *Server) UpdateSession(c *Connection, su *SessionUpdate) uint64 { - - su.Id = c.Id - return c.h.sessionupdateHandler(su) - -} - -func (s *Server) ContactRequest(c *Connection, to string, cr *DataContactRequest) (err error) { - - return c.h.contactrequestHandler(c, to, cr) - -} - -func (s *Server) Users(c *Connection) { - - room := c.h.GetRoom(c.Roomid) - room.usersHandler(c) - -} - -func (s *Server) Authenticate(c *Connection, st *SessionToken) bool { - - err := c.h.authenticateHandler(c.Session, st, "") - if err == nil { - log.Println("Authentication success", c.Id, c.Idx, c.Session.Userid) - return true - } else { - log.Println("Authentication failed", err, c.Id, c.Idx, st.Userid, st.Nonce) - return false - } - -} - -func (s *Server) UpdateRoomConnection(c *Connection, rcu *RoomConnectionUpdate) { - - rcu.Sessionid = c.Id - rcu.Connection = c - room := c.h.GetRoom(c.Roomid) - room.connectionHandler(rcu) - -} diff --git a/src/app/spreed-webrtc-server/session.go b/src/app/spreed-webrtc-server/session.go index ccb8bacd..92623b16 100644 --- a/src/app/spreed-webrtc-server/session.go +++ b/src/app/spreed-webrtc-server/session.go @@ -39,26 +39,28 @@ type Session struct { Status interface{} Nonce string Prio int + Hello bool + Roomid string mutex sync.RWMutex userid string fake bool stamp int64 attestation *SessionAttestation + attestations *securecookie.SecureCookie subscriptions map[string]*Session subscribers map[string]*Session - h *Hub } -func NewSession(h *Hub, id, sid string) *Session { +func NewSession(attestations *securecookie.SecureCookie, id, sid string) *Session { session := &Session{ Id: id, Sid: sid, Prio: 100, stamp: time.Now().Unix(), + attestations: attestations, subscriptions: make(map[string]*Session), subscribers: make(map[string]*Session), - h: h, } session.NewAttestation() return session @@ -288,35 +290,27 @@ func (s *Session) DataSessionStatus() *DataSession { } func (s *Session) NewAttestation() { - s.attestation = &SessionAttestation{ s: s, } s.attestation.Update() - } func (s *Session) Attestation() (attestation string) { - s.mutex.RLock() attestation = s.attestation.Token() s.mutex.RUnlock() return - } func (s *Session) UpdateAttestation() { - s.mutex.Lock() s.attestation.Update() s.mutex.Unlock() - } type SessionUpdate struct { - Id string Types []string - Roomid string Ua string Prio int Status interface{} @@ -336,39 +330,31 @@ type SessionAttestation struct { } func (sa *SessionAttestation) Update() (string, error) { - token, err := sa.Encode() if err == nil { sa.token = token sa.refresh = time.Now().Unix() + 180 // expires after 3 minutes } return token, err - } func (sa *SessionAttestation) Token() (token string) { - if sa.refresh < time.Now().Unix() { token, _ = sa.Update() } else { token = sa.token } return - } func (sa *SessionAttestation) Encode() (string, error) { - - return sa.s.h.attestations.Encode("attestation", sa.s.Id) - + return sa.s.attestations.Encode("attestation", sa.s.Id) } func (sa *SessionAttestation) Decode(token string) (string, error) { - var id string - err := sa.s.h.attestations.Decode("attestation", token, &id) + err := sa.s.attestations.Decode("attestation", token, &id) return id, err - } func init() { diff --git a/src/app/spreed-webrtc-server/session_manager.go b/src/app/spreed-webrtc-server/session_manager.go new file mode 100644 index 00000000..28019253 --- /dev/null +++ b/src/app/spreed-webrtc-server/session_manager.go @@ -0,0 +1,173 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "crypto/sha256" + "net/http" + "sync" + + "github.com/gorilla/securecookie" +) + +type UserStats interface { + UserInfo(bool) (int, map[string]*DataUser) +} + +type SessionManager interface { + UserStats + RetrieveUsersWith(func(*http.Request) (string, error)) + CreateSession(*http.Request) *Session + DestroySession(*Session) + Authenticate(*Session, *SessionToken, string) error + GetUserSessions(session *Session, id string) []*DataSession +} + +type sessionManager struct { + Tickets + sync.RWMutex + config *Config + userTable map[string]*User + fakesessionTable map[string]*Session + useridRetriever func(*http.Request) (string, error) + attestations *securecookie.SecureCookie +} + +func NewSessionManager(config *Config, tickets Tickets, sessionSecret []byte) SessionManager { + sessionManager := &sessionManager{ + tickets, + sync.RWMutex{}, + config, + make(map[string]*User), + make(map[string]*Session), + nil, + nil, + } + + sessionManager.attestations = securecookie.New(sessionSecret, nil) + sessionManager.attestations.MaxAge(300) // 5 minutes + sessionManager.attestations.HashFunc(sha256.New) + + return sessionManager +} + +func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, users map[string]*DataUser) { + sessionManager.RLock() + defer sessionManager.RUnlock() + + userCount = len(sessionManager.userTable) + if details { + users := make(map[string]*DataUser) + for userid, user := range sessionManager.userTable { + users[userid] = user.Data() + } + } + return +} + +func (sessionManager *sessionManager) RetrieveUsersWith(retriever func(*http.Request) (string, error)) { + sessionManager.useridRetriever = retriever +} + +func (sessionManager *sessionManager) CreateSession(request *http.Request) *Session { + request.ParseForm() + token := request.FormValue("t") + st := sessionManager.DecodeSessionToken(token) + + var userid string + if sessionManager.config.UsersEnabled { + if sessionManager.useridRetriever != nil { + userid, _ = sessionManager.useridRetriever(request) + if userid == "" { + userid = st.Userid + } + } + } + + session := NewSession(sessionManager.attestations, st.Id, st.Sid) + + if userid != "" { + // XXX(lcooper): Should errors be handled here? + sessionManager.Authenticate(session, st, userid) + } + + return session +} + +func (sessionManager *sessionManager) DestroySession(session *Session) { + session.Close() + + sessionManager.Lock() + if suserid := session.Userid(); suserid != "" { + user, ok := sessionManager.userTable[suserid] + if ok && user.RemoveSession(session) { + delete(sessionManager.userTable, suserid) + } + } + sessionManager.Unlock() +} + +func (sessionManager *sessionManager) Authenticate(session *Session, st *SessionToken, userid string) error { + if err := session.Authenticate(sessionManager.Realm(), st, userid); err != nil { + return err + } + + // Authentication success. + suserid := session.Userid() + sessionManager.Lock() + user, ok := sessionManager.userTable[suserid] + if !ok { + user = NewUser(suserid) + sessionManager.userTable[suserid] = user + } + sessionManager.Unlock() + user.AddSession(session) + return nil +} + +func (sessionManager *sessionManager) GetUserSessions(session *Session, userid string) (users []*DataSession) { + var ( + user *User + ok bool + ) + sessionManager.RLock() + user, ok = sessionManager.userTable[userid] + sessionManager.RUnlock() + if !ok { + // No user. Create fake session. + sessionManager.Lock() + session, ok := sessionManager.fakesessionTable[userid] + if !ok { + st := sessionManager.FakeSessionToken(userid) + session = NewSession(sessionManager.attestations, st.Id, st.Sid) + session.SetUseridFake(st.Userid) + sessionManager.fakesessionTable[userid] = session + } + sessionManager.Unlock() + users = make([]*DataSession, 1, 1) + users[0] = session.Data() + } else { + // Add sessions for foreign user. + users = user.SubscribeSessions(session) + } + return +} diff --git a/src/app/spreed-webrtc-server/sessions.go b/src/app/spreed-webrtc-server/sessions.go index 467dea2e..6ea0b452 100644 --- a/src/app/spreed-webrtc-server/sessions.go +++ b/src/app/spreed-webrtc-server/sessions.go @@ -23,6 +23,7 @@ package main import ( "encoding/json" + "errors" "github.com/gorilla/mux" "log" "net/http" @@ -42,7 +43,8 @@ type SessionNonceRequest struct { } type Sessions struct { - hub *Hub + SessionValidator + SessionStore users *Users } @@ -78,7 +80,7 @@ func (sessions *Sessions) Patch(request *http.Request) (int, interface{}, http.H } // Make sure Sid matches session and is valid. - if !sessions.hub.ValidateSession(snr.Id, snr.Sid) { + if !sessions.ValidateSession(snr.Id, snr.Sid) { log.Println("Session patch failed - validation failed.") error = true } @@ -104,7 +106,12 @@ func (sessions *Sessions) Patch(request *http.Request) (int, interface{}, http.H var nonce string if !error { // FIXME(longsleep): Not running this might reveal error state with a timing attack. - nonce, err = sessions.hub.sessiontokenHandler(&SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) + if session, ok := sessions.GetSession(snr.Id); ok { + nonce, err = session.Authorize(sessions.Realm(), &SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) + } else { + err = errors.New("no such session") + } + if err != nil { log.Println("Session patch failed - handle failed.", err) error = true diff --git a/src/app/spreed-webrtc-server/stats.go b/src/app/spreed-webrtc-server/stats.go index cf344a25..ac9ac0e5 100644 --- a/src/app/spreed-webrtc-server/stats.go +++ b/src/app/spreed-webrtc-server/stats.go @@ -33,11 +33,11 @@ type Stat struct { Hub *HubStat `json:"hub"` } -func NewStat(details bool, h *Hub) *Stat { +func NewStat(details bool, statsGenerator StatsGenerator) *Stat { stat := &Stat{ details: details, Runtime: &RuntimeStat{}, - Hub: h.Stat(details), + Hub: statsGenerator.Stat(details), } stat.Runtime.Read() return stat @@ -69,12 +69,12 @@ func (stat *RuntimeStat) Read() { } type Stats struct { - hub *Hub + StatsGenerator } func (stats *Stats) Get(request *http.Request) (int, interface{}, http.Header) { details := request.Form.Get("details") == "1" - return 200, NewStat(details, stats.hub), http.Header{"Content-Type": {"application/json; charset=utf-8"}, "Access-Control-Allow-Origin": {"*"}} + return 200, NewStat(details, stats), http.Header{"Content-Type": {"application/json; charset=utf-8"}, "Access-Control-Allow-Origin": {"*"}} } diff --git a/src/app/spreed-webrtc-server/stats_manager.go b/src/app/spreed-webrtc-server/stats_manager.go new file mode 100644 index 00000000..59163839 --- /dev/null +++ b/src/app/spreed-webrtc-server/stats_manager.go @@ -0,0 +1,104 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "sync/atomic" +) + +type HubStat struct { + Rooms int `json:"rooms"` + Connections int `json:"connections"` + Sessions int `json:"sessions"` + Users int `json:"users"` + Count uint64 `json:"count"` + BroadcastChatMessages uint64 `json:"broadcastchatmessages"` + UnicastChatMessages uint64 `json:"unicastchatmessages"` + IdsInRoom map[string][]string `json:"idsinroom,omitempty"` + SessionsById map[string]*DataSession `json:"sessionsbyid,omitempty"` + UsersById map[string]*DataUser `json:"usersbyid,omitempty"` + ConnectionsByIdx map[string]string `json:"connectionsbyidx,omitempty"` +} + +type ConnectionCounter interface { + CountConnection() uint64 +} + +type StatsCounter interface { + CountBroadcastChat() + CountUnicastChat() +} + +type StatsGenerator interface { + Stat(details bool) *HubStat +} + +type StatsManager interface { + ConnectionCounter + StatsCounter + StatsGenerator +} + +type statsManager struct { + ClientStats + RoomStats + UserStats + connectionCount uint64 + broadcastChatMessages uint64 + unicastChatMessages uint64 +} + +func NewStatsManager(clientStats ClientStats, roomStats RoomStats, userStats UserStats) StatsManager { + return &statsManager{clientStats, roomStats, userStats, 0, 0, 0} +} + +func (stats *statsManager) CountConnection() uint64 { + return atomic.AddUint64(&stats.connectionCount, 1) +} + +func (stats *statsManager) CountBroadcastChat() { + atomic.AddUint64(&stats.broadcastChatMessages, 1) +} + +func (stats *statsManager) CountUnicastChat() { + atomic.AddUint64(&stats.unicastChatMessages, 1) +} + +func (stats *statsManager) Stat(details bool) *HubStat { + roomCount, roomSessionInfo := stats.RoomInfo(details) + clientCount, sessions, connections := stats.ClientInfo(details) + userCount, users := stats.UserInfo(details) + + return &HubStat{ + Rooms: roomCount, + Connections: clientCount, + Sessions: clientCount, + Users: userCount, + Count: atomic.LoadUint64(&stats.connectionCount), + BroadcastChatMessages: atomic.LoadUint64(&stats.broadcastChatMessages), + UnicastChatMessages: atomic.LoadUint64(&stats.unicastChatMessages), + IdsInRoom: roomSessionInfo, + SessionsById: sessions, + UsersById: users, + ConnectionsByIdx: connections, + } +} diff --git a/src/app/spreed-webrtc-server/tickets.go b/src/app/spreed-webrtc-server/tickets.go new file mode 100644 index 00000000..b44f8bef --- /dev/null +++ b/src/app/spreed-webrtc-server/tickets.go @@ -0,0 +1,130 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "crypto/aes" + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "fmt" + "log" + + "github.com/gorilla/securecookie" +) + +type SessionValidator interface { + Realm() string + ValidateSession(string, string) bool +} + +type SessionEncoder interface { + EncodeSessionToken(*Session) (string, error) + EncodeSessionUserID(*Session) string +} + +type Tickets interface { + SessionValidator + SessionEncoder + DecodeSessionToken(token string) (st *SessionToken) + FakeSessionToken(userid string) *SessionToken +} + +type tickets struct { + *securecookie.SecureCookie + realm string + tokenName string + encryptionSecret []byte +} + +func NewTickets(sessionSecret, encryptionSecret []byte, realm string) Tickets { + tickets := &tickets{ + nil, + realm, + fmt.Sprintf("token@%s", realm), + encryptionSecret, + } + tickets.SecureCookie = securecookie.New(sessionSecret, encryptionSecret) + tickets.MaxAge(86400 * 30) // 30 days + tickets.HashFunc(sha256.New) + tickets.BlockFunc(aes.NewCipher) + + return tickets +} + +func (tickets *tickets) Realm() string { + return tickets.realm +} + +func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) { + var err error + if token != "" { + st = &SessionToken{} + err = tickets.Decode(tickets.tokenName, token, st) + if err != nil { + log.Println("Error while decoding session token", err) + } + } + + if st == nil || err != nil { + sid := NewRandomString(32) + id, _ := tickets.Encode("id", sid) + st = &SessionToken{Id: id, Sid: sid} + log.Println("Created new session id", id) + } + return +} + +func (tickets *tickets) FakeSessionToken(userid string) *SessionToken { + st := &SessionToken{} + st.Sid = fmt.Sprintf("fake-%s", NewRandomString(27)) + st.Id, _ = tickets.Encode("id", st.Sid) + st.Userid = userid + log.Println("Created new fake session id", st.Id) + return st +} + +func (tickets *tickets) ValidateSession(id, sid string) bool { + var decoded string + if err := tickets.Decode("id", id, &decoded); err != nil { + log.Println("Session validation error", err, id, sid) + return false + } + if decoded != sid { + log.Println("Session validation failed", id, sid) + return false + } + return true +} + +func (tickets *tickets) EncodeSessionToken(session *Session) (string, error) { + return tickets.Encode(tickets.tokenName, session.Token()) +} + +func (tickets *tickets) EncodeSessionUserID(session *Session) (suserid string) { + if userid := session.Userid(); userid != "" { + m := hmac.New(sha256.New, tickets.encryptionSecret) + m.Write([]byte(userid)) + suserid = base64.StdEncoding.EncodeToString(m.Sum(nil)) + } + return +} diff --git a/src/app/spreed-webrtc-server/users.go b/src/app/spreed-webrtc-server/users.go index ed70687f..c10fa512 100644 --- a/src/app/spreed-webrtc-server/users.go +++ b/src/app/spreed-webrtc-server/users.go @@ -291,16 +291,21 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) { } type Users struct { - hub *Hub + SessionValidator + SessionManager + SessionStore realm string handler UsersHandler } -func NewUsers(hub *Hub, mode, realm string, runtime phoenix.Runtime) *Users { +func NewUsers(sessionStore SessionStore, sessionValidator SessionValidator, sessionManager SessionManager, mode, realm string, runtime phoenix.Runtime) *Users { var users = &Users{ - hub: hub, - realm: realm, + sessionValidator, + sessionManager, + sessionStore, + realm, + nil, } var handler UsersHandler @@ -309,8 +314,8 @@ func NewUsers(hub *Hub, mode, realm string, runtime phoenix.Runtime) *Users { // Create handler based on mode. if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil { users.handler = handler - // Register handler Get at the hub. - users.hub.useridRetriever = func(request *http.Request) (userid string, err error) { + // Register handler Get. + sessionManager.RetrieveUsersWith(func(request *http.Request) (userid string, err error) { userid, err = handler.Get(request) if err != nil { log.Printf("Failed to get userid from handler: %s", err) @@ -320,7 +325,7 @@ func NewUsers(hub *Hub, mode, realm string, runtime phoenix.Runtime) *Users { } } return - } + }) log.Printf("Enabled users handler '%s'\n", mode) } else if err != nil { log.Printf("Failed to enable handler '%s': %s\n", mode, err) @@ -450,11 +455,20 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) userid := fmt.Sprintf("%s@%s", uuid.NewV4().String(), users.realm) // Make sure Sid matches session and is valid. - if !users.hub.ValidateSession(snr.Id, snr.Sid) { + if !users.ValidateSession(snr.Id, snr.Sid) { return 403, NewApiError("users_invalid_session", "Invalid session"), http.Header{"Content-Type": {"application/json"}} } - nonce, err := users.hub.sessiontokenHandler(&SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) + var ( + nonce string + err error + ) + if session, ok := users.GetSession(snr.Id); ok { + nonce, err = session.Authorize(users.Realm(), &SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) + } else { + err = errors.New("no such session") + } + if err != nil { return 400, NewApiError("users_request_failed", fmt.Sprintf("Error: %q", err)), http.Header{"Content-Type": {"application/json"}} } diff --git a/src/app/spreed-webrtc-server/ws.go b/src/app/spreed-webrtc-server/ws.go index 800b9165..9bba4bfc 100644 --- a/src/app/spreed-webrtc-server/ws.go +++ b/src/app/spreed-webrtc-server/ws.go @@ -22,9 +22,10 @@ package main import ( - "github.com/gorilla/websocket" "log" "net/http" + + "github.com/gorilla/websocket" ) const ( @@ -49,10 +50,8 @@ var ( } ) -func makeWsHubHandler(h *Hub) http.HandlerFunc { - +func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionManager, codec Codec, channellingAPI ChannellingAPI) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - // Validate incoming request. if r.Method != "GET" { w.WriteHeader(http.StatusMethodNotAllowed) @@ -68,30 +67,14 @@ func makeWsHubHandler(h *Hub) http.HandlerFunc { return } - // Read request details. - r.ParseForm() - token := r.FormValue("t") - // Create a new connection instance. - c := NewConnection(h, ws, r) - if token != "" { - if err := c.reregister(token); err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } else { - if err := c.register(); err != nil { - log.Println(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } + session := sessionManager.CreateSession(r) + defer sessionManager.DestroySession(session) + client := NewClient(codec, channellingAPI, session) + conn := NewConnection(connectionCounter.CountConnection(), ws, client) // Start pumps (readPump blocks). - go c.writePump() - c.readPump() - + go conn.writePump() + conn.readPump() } - }