From 6bcd814c1fb11d12d335410639ac8686787f4515 Mon Sep 17 00:00:00 2001 From: Lance Cooper Date: Mon, 26 Jan 2015 22:29:04 -0600 Subject: [PATCH] Move all session cleanup code to a single code path. This should resolve any cases where sessions were not removed from the hub after their connection closed. Also moves buddy image management entirely into the session and vastly improves locking around session data. Also allocates less when sending client left messages and paves the way for session specific message codecs. --- .../spreed-webrtc-server/channelling_api.go | 95 ++------ .../channelling_api_test.go | 18 +- src/app/spreed-webrtc-server/client.go | 6 +- src/app/spreed-webrtc-server/connection.go | 12 +- src/app/spreed-webrtc-server/hub.go | 22 +- src/app/spreed-webrtc-server/main.go | 4 +- src/app/spreed-webrtc-server/room_manager.go | 29 +-- src/app/spreed-webrtc-server/roomworker.go | 18 +- src/app/spreed-webrtc-server/session.go | 212 +++++++++++------- .../spreed-webrtc-server/session_manager.go | 34 +-- src/app/spreed-webrtc-server/user.go | 4 +- src/app/spreed-webrtc-server/ws.go | 2 +- 12 files changed, 220 insertions(+), 236 deletions(-) diff --git a/src/app/spreed-webrtc-server/channelling_api.go b/src/app/spreed-webrtc-server/channelling_api.go index 1c2af420..96ea1db8 100644 --- a/src/app/spreed-webrtc-server/channelling_api.go +++ b/src/app/spreed-webrtc-server/channelling_api.go @@ -23,7 +23,6 @@ package main import ( "log" - "strings" "time" ) @@ -34,7 +33,6 @@ const ( type ChannellingAPI interface { OnConnect(Client, *Session) OnIncoming(ResponseSender, *Session, *DataIncoming) - OnDisconnect(*Session) } type channellingAPI struct { @@ -46,11 +44,9 @@ type channellingAPI struct { ContactManager TurnDataCreator Unicaster - Broadcaster - buddyImages ImageCache } -func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, broadcaster Broadcaster, buddyImages ImageCache) ChannellingAPI { +func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI { return &channellingAPI{ config, roomStatus, @@ -60,8 +56,6 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco contactManager, turnDataCreator, unicaster, - broadcaster, - buddyImages, } } @@ -77,40 +71,31 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D case "Hello": //log.Println("Hello", msg.Hello, c.Index()) // TODO(longsleep): Filter room id and user agent. - api.UpdateSession(session, &SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) - if session.Hello && session.Roomid != msg.Hello.Id { - api.LeaveRoom(session) - api.Broadcast(session, session.DataSessionLeft("soft")) - } + session.Update(&SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) + room, err := session.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, c) // NOTE(lcooper): Iid filtered for compatibility's sake. // Evaluate sending unconditionally when supported by all clients. - if room, err := api.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, session, c); err == nil { - session.Hello = true - session.Roomid = msg.Hello.Id - if msg.Iid != "" { + if msg.Iid != "" { + if err == nil { c.Reply(msg.Iid, &DataWelcome{ Type: "Welcome", Room: room, Users: api.RoomUsers(session), }) - } - api.Broadcast(session, session.DataSessionJoined()) - } else { - session.Hello = false - if msg.Iid != "" { + } else { c.Reply(msg.Iid, err) } } case "Offer": // TODO(longsleep): Validate offer - api.Unicast(session, msg.Offer.To, msg.Offer) + session.Unicast(msg.Offer.To, msg.Offer) case "Candidate": // TODO(longsleep): Validate candidate - api.Unicast(session, msg.Candidate.To, msg.Candidate) + session.Unicast(msg.Candidate.To, msg.Candidate) case "Answer": // TODO(longsleep): Validate Answer - api.Unicast(session, msg.Answer.To, msg.Answer) + session.Unicast(msg.Answer.To, msg.Answer) case "Users": if session.Hello { sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)} @@ -125,27 +110,27 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D if err := api.Authenticate(session, st, ""); err == nil { log.Println("Authentication success", session.Userid) api.SendSelf(c, session) - api.BroadcastSessionStatus(session) + session.BroadcastStatus() } else { log.Println("Authentication failed", err, st.Userid, st.Nonce) } case "Bye": - api.Unicast(session, msg.Bye.To, msg.Bye) + session.Unicast(msg.Bye.To, msg.Bye) case "Status": //log.Println("Status", msg.Status) - api.UpdateSession(session, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) - api.BroadcastSessionStatus(session) + session.Update(&SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) + session.BroadcastStatus() case "Chat": // TODO(longsleep): Limit sent chat messages per incoming connection. if !msg.Chat.Chat.NoEcho { - api.Unicast(session, session.Id, msg.Chat) + session.Unicast(session.Id, msg.Chat) } msg.Chat.Chat.Time = time.Now().Format(time.RFC3339) if msg.Chat.To == "" { // TODO(longsleep): Check if chat broadcast is allowed. if session.Hello { api.CountBroadcastChat() - api.Broadcast(session, msg.Chat) + session.Broadcast(msg.Chat) } } else { if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil { @@ -159,10 +144,10 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D api.CountUnicastChat() } - api.Unicast(session, msg.Chat.To, msg.Chat) + session.Unicast(msg.Chat.To, msg.Chat) if msg.Chat.Chat.Mid != "" { // Send out delivery confirmation status chat message. - api.Unicast(session, session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) + session.Unicast(session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) } } case "Conference": @@ -173,7 +158,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D // Send conference update to anyone. for _, id := range msg.Conference.Conference { if id != session.Id { - api.Unicast(session, id, msg.Conference) + session.Unicast(id, msg.Conference) } } } @@ -211,7 +196,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D } case "Room": if room, err := api.UpdateRoom(session, msg.Room); err == nil { - api.Broadcast(session, room) + session.Broadcast(room) c.Reply(msg.Iid, room) } else { c.Reply(msg.Iid, err) @@ -221,25 +206,6 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D } } -func (api *channellingAPI) OnDisconnect(session *Session) { - session.Disconnect() - api.LeaveRoom(session) - - dsl := session.DataSessionLeft("hard") - if session.Hello { - api.Broadcast(session, dsl) - } - - session.RunForAllSubscribers(func(session *Session) { - log.Println("Notifying subscriber that we are gone", session.Id) - api.Unicast(session, session.Id, dsl) - }) - - api.Unicaster.OnDisconnect(session) - - api.buddyImages.Delete(session.Id) -} - func (api *channellingAPI) SendSelf(c Responder, session *Session) { token, err := api.EncodeSessionToken(session) if err != nil { @@ -261,26 +227,3 @@ func (api *channellingAPI) SendSelf(c Responder, session *Session) { } c.Reply("", self) } - -func (api *channellingAPI) UpdateSession(session *Session, s *SessionUpdate) uint64 { - if s.Status != nil { - status, ok := s.Status.(map[string]interface{}) - if ok && status["buddyPicture"] != nil { - pic := status["buddyPicture"].(string) - if strings.HasPrefix(pic, "data:") { - imageId := api.buddyImages.Update(session.Id, pic[5:]) - if imageId != "" { - status["buddyPicture"] = "img:" + imageId - } - } - } - } - - return session.Update(s) -} - -func (api *channellingAPI) BroadcastSessionStatus(session *Session) { - if session.Hello { - api.Broadcast(session, session.DataSessionStatus()) - } -} diff --git a/src/app/spreed-webrtc-server/channelling_api_test.go b/src/app/spreed-webrtc-server/channelling_api_test.go index 0ee186c1..af9b0f9f 100644 --- a/src/app/spreed-webrtc-server/channelling_api_test.go +++ b/src/app/spreed-webrtc-server/channelling_api_test.go @@ -62,12 +62,12 @@ func (fake *fakeRoomManager) JoinRoom(id string, _ *DataRoomCredentials, session return &DataRoom{Name: id}, fake.joinError } -func (fake *fakeRoomManager) LeaveRoom(session *Session) { - fake.leftID = session.Roomid +func (fake *fakeRoomManager) LeaveRoom(roomID, sessionID string) { + fake.leftID = roomID } -func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) { - fake.broadcasts = append(fake.broadcasts, msg) +func (fake *fakeRoomManager) Broadcast(_, _ string, outgoing *DataOutgoing) { + fake.broadcasts = append(fake.broadcasts, outgoing.Data) } func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) { @@ -98,8 +98,14 @@ func assertErrorReply(t *testing.T, client *fakeClient, iid, code string) { } func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { - client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{} - return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager + client, roomManager := &fakeClient{}, &fakeRoomManager{} + session := &Session{ + attestations: sessionNonces, + Broadcaster: roomManager, + RoomStatusManager: roomManager, + } + session.attestation = &SessionAttestation{s: session} + return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil), client, session, roomManager } func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { diff --git a/src/app/spreed-webrtc-server/client.go b/src/app/spreed-webrtc-server/client.go index 46a57694..1a437605 100644 --- a/src/app/spreed-webrtc-server/client.go +++ b/src/app/spreed-webrtc-server/client.go @@ -42,7 +42,7 @@ type Client interface { ResponseSender Session() *Session Index() uint64 - Close(bool) + Close() } type client struct { @@ -70,10 +70,6 @@ func (client *client) OnText(b Buffer) { } } -func (client *client) OnDisconnect() { - client.ChannellingAPI.OnDisconnect(client.session) -} - func (client *client) Reply(iid string, m interface{}) { outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m} if b, err := client.EncodeOutgoing(outgoing); err == nil { diff --git a/src/app/spreed-webrtc-server/connection.go b/src/app/spreed-webrtc-server/connection.go index e1d343fc..f99e405e 100644 --- a/src/app/spreed-webrtc-server/connection.go +++ b/src/app/spreed-webrtc-server/connection.go @@ -56,7 +56,7 @@ const ( type Connection interface { Index() uint64 Send(Buffer) - Close(runCallbacks bool) + Close() readPump() writePump() } @@ -65,7 +65,6 @@ type ConnectionHandler interface { NewBuffer() Buffer OnConnect(Connection) OnText(Buffer) - OnDisconnect() } type connection struct { @@ -98,15 +97,12 @@ func (c *connection) Index() uint64 { return c.Idx } -func (c *connection) Close(runCallbacks bool) { +func (c *connection) Close() { c.mutex.Lock() if c.isClosed { c.mutex.Unlock() return } - if runCallbacks { - c.handler.OnDisconnect() - } c.ws.Close() c.isClosed = true for { @@ -170,7 +166,7 @@ func (c *connection) readPump() { } } - c.Close(true) + c.Close() } // Write message to outbound queue. @@ -269,7 +265,7 @@ func (c *connection) writePump() { cleanup: //fmt.Println("writePump done") timer.Stop() - c.Close(true) + c.Close() } // Write ping message. diff --git a/src/app/spreed-webrtc-server/hub.go b/src/app/spreed-webrtc-server/hub.go index 9c1d1742..9a899f12 100644 --- a/src/app/spreed-webrtc-server/hub.go +++ b/src/app/spreed-webrtc-server/hub.go @@ -48,8 +48,8 @@ type SessionStore interface { type Unicaster interface { SessionStore OnConnect(Client, *Session) - Unicast(session *Session, to string, m interface{}) - OnDisconnect(*Session) + Unicast(to string, outgoing *DataOutgoing) + OnDisconnect(sessionID string) } type ContactManager interface { @@ -158,7 +158,7 @@ func (h *hub) OnConnect(client Client, session *Session) { // Register connection or replace existing one. if ec, ok := h.clients[session.Id]; ok { - ec.Close(false) + ec.Close() //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) } h.clients[session.Id] = client @@ -167,26 +167,20 @@ func (h *hub) OnConnect(client Client, session *Session) { //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) } -func (h *hub) OnDisconnect(session *Session) { +func (h *hub) OnDisconnect(sessionID string) { h.mutex.Lock() - delete(h.clients, session.Id) + delete(h.clients, sessionID) h.mutex.Unlock() } -func (h *hub) GetClient(id string) (client Client, ok bool) { +func (h *hub) GetClient(sessionID string) (client Client, ok bool) { h.mutex.RLock() - client, ok = h.clients[id] + client, ok = h.clients[sessionID] h.mutex.RUnlock() return } -func (h *hub) Unicast(session *Session, to string, m interface{}) { - outgoing := &DataOutgoing{ - From: session.Id, - To: to, - A: session.Attestation(), - Data: m, - } +func (h *hub) Unicast(to string, outgoing *DataOutgoing) { if message, err := h.EncodeOutgoing(outgoing); err == nil { client, ok := h.GetClient(to) if !ok { diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index d05af33f..2bc4e643 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -340,9 +340,9 @@ func runner(runtime phoenix.Runtime) error { roomManager := NewRoomManager(config, codec) hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec) tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) - sessionManager := NewSessionManager(config, tickets, sessionSecret) + sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := NewStatsManager(hub, roomManager, sessionManager) - channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, roomManager, buddyImages) + channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour))) r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) diff --git a/src/app/spreed-webrtc-server/room_manager.go b/src/app/spreed-webrtc-server/room_manager.go index fdcd27ad..3de0528f 100644 --- a/src/app/spreed-webrtc-server/room_manager.go +++ b/src/app/spreed-webrtc-server/room_manager.go @@ -29,12 +29,12 @@ import ( type RoomStatusManager interface { RoomUsers(*Session) []*DataSession JoinRoom(string, *DataRoomCredentials, *Session, Sender) (*DataRoom, error) - LeaveRoom(*Session) + LeaveRoom(roomID, sessionID string) UpdateRoom(*Session, *DataRoom) (*DataRoom, error) } type Broadcaster interface { - Broadcast(*Session, interface{}) + Broadcast(sessionID, roomID string, outgoing *DataOutgoing) } type RoomStats interface { @@ -84,9 +84,9 @@ func (rooms *roomManager) JoinRoom(id string, credentials *DataRoomCredentials, return roomWorker.Join(credentials, session, sender) } -func (rooms *roomManager) LeaveRoom(session *Session) { - if room, ok := rooms.Get(session.Roomid); ok { - room.Leave(session) +func (rooms *roomManager) LeaveRoom(roomID, sessionID string) { + if room, ok := rooms.Get(roomID); ok { + room.Leave(sessionID) } } @@ -104,29 +104,22 @@ func (rooms *roomManager) UpdateRoom(session *Session, room *DataRoom) (*DataRoo return room, nil } -func (rooms *roomManager) Broadcast(session *Session, m interface{}) { - outgoing := &DataOutgoing{ - From: session.Id, - A: session.Attestation(), - Data: m, - } - +func (rooms *roomManager) Broadcast(sessionID, roomID string, outgoing *DataOutgoing) { message, err := rooms.EncodeOutgoing(outgoing) if err != nil { return } - id := session.Roomid - if id != "" && id == rooms.globalRoomID { + if roomID != "" && roomID == rooms.globalRoomID { rooms.RLock() for _, room := range rooms.roomTable { - room.Broadcast(session, message) + room.Broadcast(sessionID, message) } rooms.RUnlock() - } else if room, ok := rooms.Get(id); ok { - room.Broadcast(session, message) + } else if room, ok := rooms.Get(roomID); ok { + room.Broadcast(sessionID, message) } else { - log.Printf("No room named %s found for broadcast message %#v", id, m) + log.Printf("No room named %s found for broadcast %#v", roomID, outgoing) } message.Decref() } diff --git a/src/app/spreed-webrtc-server/roomworker.go b/src/app/spreed-webrtc-server/roomworker.go index 95ce744d..f0dea820 100644 --- a/src/app/spreed-webrtc-server/roomworker.go +++ b/src/app/spreed-webrtc-server/roomworker.go @@ -39,9 +39,9 @@ type RoomWorker interface { Users() []*roomUser Update(*DataRoom) error GetUsers() []*DataSession - Broadcast(*Session, Buffer) + Broadcast(sessionID string, buf Buffer) Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) - Leave(*Session) + Leave(sessionID string) } type roomWorker struct { @@ -213,12 +213,12 @@ func (r *roomWorker) GetUsers() []*DataSession { return <-out } -func (r *roomWorker) Broadcast(session *Session, message Buffer) { +func (r *roomWorker) Broadcast(sessionID string, message Buffer) { worker := func() { r.mutex.RLock() for id, user := range r.users { - if id == session.Id { + if id == sessionID { // Skip broadcast to self. continue } @@ -261,10 +261,6 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se } } - if session.IsDisconnected() { - log.Println("Refusing to join a disconnected session to room", session.Id) - return - } r.users[session.Id] = &roomUser{session, sender} // NOTE(lcooper): Needs to be a copy, else we risk races with // a subsequent modification of room properties. @@ -277,11 +273,11 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se return result.DataRoom, result.error } -func (r *roomWorker) Leave(session *Session) { +func (r *roomWorker) Leave(sessionID string) { worker := func() { r.mutex.Lock() - if _, ok := r.users[session.Id]; ok { - delete(r.users, session.Id) + if _, ok := r.users[sessionID]; ok { + delete(r.users, sessionID) } r.mutex.Unlock() } diff --git a/src/app/spreed-webrtc-server/session.go b/src/app/spreed-webrtc-server/session.go index 44e30742..fece3c28 100644 --- a/src/app/spreed-webrtc-server/session.go +++ b/src/app/spreed-webrtc-server/session.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "github.com/gorilla/securecookie" + "strings" "sync" "time" ) @@ -32,6 +33,11 @@ import ( var sessionNonces *securecookie.SecureCookie type Session struct { + SessionManager + Unicaster + Broadcaster + RoomStatusManager + buddyImages ImageCache Id string Sid string Ua string @@ -52,16 +58,21 @@ type Session struct { disconnected bool } -func NewSession(attestations *securecookie.SecureCookie, id, sid string) *Session { +func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, attestations *securecookie.SecureCookie, id, sid string) *Session { session := &Session{ - Id: id, - Sid: sid, - Prio: 100, - stamp: time.Now().Unix(), - attestations: attestations, - subscriptions: make(map[string]*Session), - subscribers: make(map[string]*Session), + SessionManager: manager, + Unicaster: unicaster, + Broadcaster: broadcaster, + RoomStatusManager: rooms, + buddyImages: buddyImages, + Id: id, + Sid: sid, + Prio: 100, + stamp: time.Now().Unix(), + attestations: attestations, + subscriptions: make(map[string]*Session), + subscribers: make(map[string]*Session), } session.NewAttestation() return session @@ -104,52 +115,144 @@ func (s *Session) RemoveSubscriber(id string) { s.mutex.Unlock() } -func (s *Session) RunForAllSubscribers(f func(session *Session)) { - +func (s *Session) JoinRoom(roomID string, credentials *DataRoomCredentials, sender Sender) (*DataRoom, error) { s.mutex.Lock() - for _, session := range s.subscribers { - s.mutex.Unlock() - f(session) - s.mutex.Lock() + defer s.mutex.Unlock() + + if s.Hello && s.Roomid != roomID { + s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id) + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Left", + Id: s.Id, + Status: "soft", + }, + }) } - s.mutex.Unlock() + room, err := s.RoomStatusManager.JoinRoom(roomID, credentials, s, sender) + if err == nil { + s.Hello = true + s.Roomid = roomID + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Joined", + Id: s.Id, + Userid: s.userid, + Ua: s.Ua, + Prio: s.Prio, + }, + }) + } else { + s.Hello = false + } + return room, err } -func (s *Session) Disconnect() { - - s.mutex.Lock() - s.disconnected = true - s.mutex.Unlock() - +func (s *Session) Broadcast(m interface{}) { + s.mutex.RLock() + if s.Hello { + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: m, + }) + } + s.mutex.RUnlock() } -func (s *Session) IsDisconnected() bool { +func (s *Session) BroadcastStatus() { + s.mutex.RLock() + if s.Hello { + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Status", + Id: s.Id, + Userid: s.userid, + Status: s.Status, + Rev: s.UpdateRev, + Prio: s.Prio, + }, + }) + } + s.mutex.RUnlock() +} +func (s *Session) Unicast(to string, m interface{}) { s.mutex.RLock() - defer s.mutex.RUnlock() - return s.disconnected + outgoing := &DataOutgoing{ + From: s.Id, + To: to, + A: s.attestation.Token(), + Data: m, + } + s.mutex.RUnlock() + + s.Unicaster.Unicast(to, outgoing) } func (s *Session) Close() { - s.mutex.Lock() - // Remove foreign references. + + outgoing := &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Left", + Id: s.Id, + Status: "hard", + }, + } + + if s.Hello { + // NOTE(lcooper): If we don't check for Hello here, we could deadlock + // when implicitly creating a room while a user is reconnecting. + s.Broadcaster.Broadcast(s.Id, s.Roomid, outgoing) + s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id) + } + + for _, session := range s.subscribers { + s.Unicaster.Unicast(session.Id, outgoing) + } + for _, session := range s.subscriptions { session.RemoveSubscriber(s.Id) } - // Remove session cross references. + + s.Unicaster.OnDisconnect(s.Id) + s.SessionManager.DestroySession(s.Id, s.userid) + s.buddyImages.Delete(s.Id) + s.subscriptions = make(map[string]*Session) s.subscribers = make(map[string]*Session) - s.mutex.Unlock() + s.disconnected = true + s.mutex.Unlock() } func (s *Session) Update(update *SessionUpdate) uint64 { - s.mutex.Lock() defer s.mutex.Unlock() + if update.Status != nil { + status, ok := update.Status.(map[string]interface{}) + if ok && status["buddyPicture"] != nil { + pic := status["buddyPicture"].(string) + if strings.HasPrefix(pic, "data:") { + imageId := s.buddyImages.Update(s.Id, pic[5:]) + if imageId != "" { + status["buddyPicture"] = "img:" + imageId + } + } + } + } + for _, key := range update.Types { //fmt.Println("type update", key) @@ -268,50 +371,6 @@ func (s *Session) SetUseridFake(userid string) { } -func (s *Session) DataSessionLeft(state string) *DataSession { - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return &DataSession{ - Type: "Left", - Id: s.Id, - Status: state, - } - -} - -func (s *Session) DataSessionJoined() *DataSession { - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return &DataSession{ - Type: "Joined", - Id: s.Id, - Userid: s.userid, - Ua: s.Ua, - Prio: s.Prio, - } - -} - -func (s *Session) DataSessionStatus() *DataSession { - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return &DataSession{ - Type: "Status", - Id: s.Id, - Userid: s.userid, - Status: s.Status, - Rev: s.UpdateRev, - Prio: s.Prio, - } - -} - func (s *Session) NewAttestation() { s.attestation = &SessionAttestation{ s: s, @@ -319,13 +378,6 @@ func (s *Session) NewAttestation() { s.attestation.Update() } -func (s *Session) Attestation() (attestation string) { - s.mutex.RLock() - attestation = s.attestation.Token() - s.mutex.RUnlock() - return -} - func (s *Session) UpdateAttestation() { s.mutex.Lock() s.attestation.Update() diff --git a/src/app/spreed-webrtc-server/session_manager.go b/src/app/spreed-webrtc-server/session_manager.go index 28019253..43360f7d 100644 --- a/src/app/spreed-webrtc-server/session_manager.go +++ b/src/app/spreed-webrtc-server/session_manager.go @@ -37,14 +37,18 @@ type SessionManager interface { UserStats RetrieveUsersWith(func(*http.Request) (string, error)) CreateSession(*http.Request) *Session - DestroySession(*Session) + DestroySession(sessionID, userID string) Authenticate(*Session, *SessionToken, string) error GetUserSessions(session *Session, id string) []*DataSession } type sessionManager struct { - Tickets sync.RWMutex + Tickets + Unicaster + Broadcaster + RoomStatusManager + buddyImages ImageCache config *Config userTable map[string]*User fakesessionTable map[string]*Session @@ -52,10 +56,14 @@ type sessionManager struct { attestations *securecookie.SecureCookie } -func NewSessionManager(config *Config, tickets Tickets, sessionSecret []byte) SessionManager { +func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager { sessionManager := &sessionManager{ - tickets, sync.RWMutex{}, + tickets, + unicaster, + broadcaster, + rooms, + buddyImages, config, make(map[string]*User), make(map[string]*Session), @@ -103,7 +111,7 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess } } - session := NewSession(sessionManager.attestations, st.Id, st.Sid) + session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) if userid != "" { // XXX(lcooper): Should errors be handled here? @@ -113,15 +121,15 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess return session } -func (sessionManager *sessionManager) DestroySession(session *Session) { - session.Close() +func (sessionManager *sessionManager) DestroySession(sessionID, userID string) { + if userID == "" { + return + } sessionManager.Lock() - if suserid := session.Userid(); suserid != "" { - user, ok := sessionManager.userTable[suserid] - if ok && user.RemoveSession(session) { - delete(sessionManager.userTable, suserid) - } + user, ok := sessionManager.userTable[userID] + if ok && user.RemoveSession(sessionID) { + delete(sessionManager.userTable, userID) } sessionManager.Unlock() } @@ -158,7 +166,7 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s session, ok := sessionManager.fakesessionTable[userid] if !ok { st := sessionManager.FakeSessionToken(userid) - session = NewSession(sessionManager.attestations, st.Id, st.Sid) + session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session.SetUseridFake(st.Userid) sessionManager.fakesessionTable[userid] = session } diff --git a/src/app/spreed-webrtc-server/user.go b/src/app/spreed-webrtc-server/user.go index 49c651b6..fa838f05 100644 --- a/src/app/spreed-webrtc-server/user.go +++ b/src/app/spreed-webrtc-server/user.go @@ -57,10 +57,10 @@ func (u *User) AddSession(s *Session) bool { } // Return true if no session left. -func (u *User) RemoveSession(s *Session) bool { +func (u *User) RemoveSession(sessionID string) bool { last := false u.mutex.Lock() - delete(u.sessionTable, s.Id) + delete(u.sessionTable, sessionID) if len(u.sessionTable) == 0 { log.Println("Last session unregistered for user", u.Id) last = true diff --git a/src/app/spreed-webrtc-server/ws.go b/src/app/spreed-webrtc-server/ws.go index 9bba4bfc..3cd7fb1e 100644 --- a/src/app/spreed-webrtc-server/ws.go +++ b/src/app/spreed-webrtc-server/ws.go @@ -69,7 +69,7 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa // Create a new connection instance. session := sessionManager.CreateSession(r) - defer sessionManager.DestroySession(session) + defer session.Close() client := NewClient(codec, channellingAPI, session) conn := NewConnection(connectionCounter.CountConnection(), ws, client)